Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use const& for the parameters of type std::string not affected by previous commit.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 2)
171   size = parse_double(action[2]);
172   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173   if (action.size() > 4)
174     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
175 }
176
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
178 {
179   CHECK_ACTION_PARAMS(action, 2, 2)
180   comm_size = parse_double(action[2]);
181   comp_size = parse_double(action[3]);
182   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
183   if (action.size() > 5)
184     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
185 }
186
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 1)
190   comm_size = parse_double(action[2]);
191   comp_size = parse_double(action[3]);
192   if (action.size() > 4)
193     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
194 }
195
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
197 {
198   CHECK_ACTION_PARAMS(action, 2, 1)
199   comm_size = MPI_COMM_WORLD->size();
200   send_size = parse_double(action[2]);
201   recv_size = parse_double(action[3]);
202
203   if (action.size() > 4)
204     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205   if (action.size() > 5)
206     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
207 }
208
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
210 {
211   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
212         0 gather 68 68 0 0 0
213       where:
214         1) 68 is the sendcounts
215         2) 68 is the recvcounts
216         3) 0 is the root node
217         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219   */
220   CHECK_ACTION_PARAMS(action, 2, 3)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_double(action[2]);
223   recv_size = parse_double(action[3]);
224
225   if (name == "gather") {
226     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
227     if (action.size() > 5)
228       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229     if (action.size() > 6)
230       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231   } else {
232     if (action.size() > 4)
233       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234     if (action.size() > 5)
235       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
236   }
237 }
238
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
240 {
241   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242        0 gather 68 68 10 10 10 0 0 0
243      where:
244        1) 68 is the sendcount
245        2) 68 10 10 10 is the recvcounts
246        3) 0 is the root node
247        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249   */
250   comm_size = MPI_COMM_WORLD->size();
251   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252   send_size  = parse_double(action[2]);
253   disps      = std::vector<int>(comm_size, 0);
254   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
255
256   if (name == "gatherv") {
257     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258     if (action.size() > 4 + comm_size)
259       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260     if (action.size() > 5 + comm_size)
261       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
262   } else {
263     int datatype_index = 0;
264     int disp_index     = 0;
265     /* The 3 comes from "0 gather <sendcount>", which must always be present.
266      * The + comm_size is the recvcounts array, which must also be present
267      */
268     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269       datatype_index = 3 + comm_size;
270       disp_index     = datatype_index + 1;
271       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
272       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
273     } else if (action.size() >
274                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275       disp_index = 3 + comm_size;
276     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277       datatype_index = 3 + comm_size;
278       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
280     }
281
282     if (disp_index != 0) {
283       for (unsigned int i = 0; i < comm_size; i++)
284         disps[i]          = std::stoi(action[disp_index + i]);
285     }
286   }
287
288   for (unsigned int i = 0; i < comm_size; i++) {
289     (*recvcounts)[i] = std::stoi(action[i + 3]);
290   }
291   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
292 }
293
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
295 {
296   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
297         0 gather 68 68 0 0 0
298       where:
299         1) 68 is the sendcounts
300         2) 68 is the recvcounts
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304   */
305   CHECK_ACTION_PARAMS(action, 2, 3)
306   comm_size = MPI_COMM_WORLD->size();
307   send_size = parse_double(action[2]);
308   recv_size = parse_double(action[3]);
309   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
310   if (action.size() > 5)
311     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312   if (action.size() > 6)
313     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
314 }
315
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
317 {
318   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319      0 gather 68 10 10 10 68 0 0 0
320       where:
321       1) 68 10 10 10 is the sendcounts
322       2) 68 is the recvcount
323       3) 0 is the root node
324       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326   */
327   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328   recv_size  = parse_double(action[2 + comm_size]);
329   disps      = std::vector<int>(comm_size, 0);
330   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
331
332   if (action.size() > 5 + comm_size)
333     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334   if (action.size() > 5 + comm_size)
335     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
336
337   for (unsigned int i = 0; i < comm_size; i++) {
338     (*sendcounts)[i] = std::stoi(action[i + 2]);
339   }
340   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
342 }
343
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
345 {
346   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347        0 reducescatter 275427 275427 275427 204020 11346849 0
348      where:
349        1) The first four values after the name of the action declare the recvcounts array
350        2) The value 11346849 is the amount of instructions
351        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
352   */
353   comm_size = MPI_COMM_WORLD->size();
354   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355   comp_size  = parse_double(action[2 + comm_size]);
356   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357   if (action.size() > 3 + comm_size)
358     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
359
360   for (unsigned int i = 0; i < comm_size; i++) {
361     recvcounts->push_back(std::stoi(action[i + 2]));
362   }
363   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
364 }
365
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
367 {
368   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369         0 alltoallv 100 1 7 10 12 100 1 70 10 5
370      where:
371       1) 100 is the size of the send buffer *sizeof(int),
372       2) 1 7 10 12 is the sendcounts array
373       3) 100*sizeof(int) is the size of the receiver buffer
374       4)  1 70 10 5 is the recvcounts array
375   */
376   comm_size = MPI_COMM_WORLD->size();
377   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380   senddisps  = std::vector<int>(comm_size, 0);
381   recvdisps  = std::vector<int>(comm_size, 0);
382
383   if (action.size() > 5 + 2 * comm_size)
384     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385   if (action.size() > 5 + 2 * comm_size)
386     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
387
388   send_buf_size = parse_double(action[2]);
389   recv_buf_size = parse_double(action[3 + comm_size]);
390   for (unsigned int i = 0; i < comm_size; i++) {
391     (*sendcounts)[i] = std::stoi(action[3 + i]);
392     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
393   }
394   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
396 }
397
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
399 {
400   std::string s = boost::algorithm::join(action, " ");
401   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403   req_storage.remove(request);
404
405   if (request == MPI_REQUEST_NULL) {
406     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
407      * return.*/
408     return;
409   }
410
411   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
412
413   // Must be taken before Request::wait() since the request may be set to
414   // MPI_REQUEST_NULL by Request::wait!
415   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
418
419   MPI_Status status;
420   Request::wait(&request, &status);
421
422   TRACE_smpi_comm_out(rank);
423   if (is_wait_for_receive)
424     TRACE_smpi_recv(args.src, args.dst, args.tag);
425 }
426
427 void SendAction::kernel(simgrid::xbt::ReplayAction&)
428 {
429   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
430
431   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432         args.tag, Datatype::encode(args.datatype1)));
433   if (not TRACE_smpi_view_internals())
434     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
435
436   if (name == "send") {
437     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438   } else if (name == "isend") {
439     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440     req_storage.add(request);
441   } else {
442     xbt_die("Don't know this action, %s", name.c_str());
443   }
444
445   TRACE_smpi_comm_out(my_proc_id);
446 }
447
448 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
449 {
450   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451         args.tag, Datatype::encode(args.datatype1)));
452
453   MPI_Status status;
454   // unknown size from the receiver point of view
455   if (args.size <= 0.0) {
456     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457     args.size = status.count;
458   }
459
460   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
461   if (name == "recv") {
462     is_recv = true;
463     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464   } else if (name == "irecv") {
465     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466     req_storage.add(request);
467   } else {
468     THROW_IMPOSSIBLE;
469   }
470
471   TRACE_smpi_comm_out(my_proc_id);
472   if (is_recv && not TRACE_smpi_view_internals()) {
473     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
474     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
475   }
476 }
477
478 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
479 {
480   TRACE_smpi_computing_in(my_proc_id, args.flops);
481   smpi_execute_flops(args.flops);
482   TRACE_smpi_computing_out(my_proc_id);
483 }
484
485 void TestAction::kernel(simgrid::xbt::ReplayAction&)
486 {
487   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
488   req_storage.remove(request);
489   // if request is null here, this may mean that a previous test has succeeded
490   // Different times in traced application and replayed version may lead to this
491   // In this case, ignore the extra calls.
492   if (request != MPI_REQUEST_NULL) {
493     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
494
495     MPI_Status status;
496     int flag = Request::test(&request, &status);
497
498     XBT_DEBUG("MPI_Test result: %d", flag);
499     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
500      * nullptr.*/
501     if (request == MPI_REQUEST_NULL)
502       req_storage.addNullRequest(args.src, args.dst, args.tag);
503     else
504       req_storage.add(request);
505
506     TRACE_smpi_comm_out(my_proc_id);
507   }
508 }
509
510 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
511 {
512   CHECK_ACTION_PARAMS(action, 0, 1)
513     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
514     : MPI_BYTE;  // default TAU datatype
515
516   /* start a simulated timer */
517   smpi_process()->simulated_start();
518 }
519
520 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
521 {
522   /* nothing to do */
523 }
524
525 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
526 {
527   const unsigned int count_requests = req_storage.size();
528
529   if (count_requests > 0) {
530     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
531     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
532     std::vector<MPI_Request> reqs;
533     req_storage.get_requests(reqs);
534     for (const auto& req : reqs) {
535       if (req && (req->flags() & MPI_REQ_RECV)) {
536         sender_receiver.push_back({req->src(), req->dst()});
537       }
538     }
539     MPI_Status status[count_requests];
540     Request::waitall(count_requests, &(reqs.data())[0], status);
541     req_storage.get_store().clear();
542
543     for (auto& pair : sender_receiver) {
544       TRACE_smpi_recv(pair.first, pair.second, 0);
545     }
546     TRACE_smpi_comm_out(my_proc_id);
547   }
548 }
549
550 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
551 {
552   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
553   Colls::barrier(MPI_COMM_WORLD);
554   TRACE_smpi_comm_out(my_proc_id);
555 }
556
557 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
558 {
559   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
560       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
561         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
562
563   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
564
565   TRACE_smpi_comm_out(my_proc_id);
566 }
567
568 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
569 {
570   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
571       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
572         args.comp_size, args.comm_size, -1,
573         Datatype::encode(args.datatype1), ""));
574
575   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
576       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
577   smpi_execute_flops(args.comp_size);
578
579   TRACE_smpi_comm_out(my_proc_id);
580 }
581
582 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
583 {
584   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
585         Datatype::encode(args.datatype1), ""));
586
587   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
588       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
589   smpi_execute_flops(args.comp_size);
590
591   TRACE_smpi_comm_out(my_proc_id);
592 }
593
594 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
595 {
596   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
597       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
598         Datatype::encode(args.datatype1),
599         Datatype::encode(args.datatype2)));
600
601   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
602       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
603       args.recv_size, args.datatype2, MPI_COMM_WORLD);
604
605   TRACE_smpi_comm_out(my_proc_id);
606 }
607
608 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
609 {
610   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
611         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
612
613   if (name == "gather") {
614     int rank = MPI_COMM_WORLD->rank();
615     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
616         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
617   }
618   else
619     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
620         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
621
622   TRACE_smpi_comm_out(my_proc_id);
623 }
624
625 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
626 {
627   int rank = MPI_COMM_WORLD->rank();
628
629   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
630         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
631         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
632
633   if (name == "gatherv") {
634     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
635         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
636         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
637   }
638   else {
639     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
640         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
641         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
642   }
643
644   TRACE_smpi_comm_out(my_proc_id);
645 }
646
647 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
648 {
649   int rank = MPI_COMM_WORLD->rank();
650   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
651         Datatype::encode(args.datatype1),
652         Datatype::encode(args.datatype2)));
653
654   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
655       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
656
657   TRACE_smpi_comm_out(my_proc_id);
658 }
659
660 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
661 {
662   int rank = MPI_COMM_WORLD->rank();
663   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
664         nullptr, Datatype::encode(args.datatype1),
665         Datatype::encode(args.datatype2)));
666
667   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
668       args.sendcounts->data(), args.disps.data(), args.datatype1,
669       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
670       MPI_COMM_WORLD);
671
672   TRACE_smpi_comm_out(my_proc_id);
673 }
674
675 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
676 {
677   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
678       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
679         std::to_string(args.comp_size), /* ugly hack to print comp_size */
680         Datatype::encode(args.datatype1)));
681
682   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
683       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
684       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
685
686   smpi_execute_flops(args.comp_size);
687   TRACE_smpi_comm_out(my_proc_id);
688 }
689
690 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
691 {
692   TRACE_smpi_comm_in(my_proc_id, __func__,
693       new simgrid::instr::VarCollTIData(
694         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
695         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
696
697   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
698       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
699
700   TRACE_smpi_comm_out(my_proc_id);
701 }
702 } // Replay Namespace
703 }} // namespace simgrid::smpi
704
705 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
706 /** @brief Only initialize the replay, don't do it for real */
707 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
708 {
709   if (not smpi_process()->initializing()){
710     simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
711     simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
712     simgrid::smpi::ActorExt::init();
713   }
714   smpi_process()->mark_as_initialized();
715   smpi_process()->set_replaying(true);
716
717   int my_proc_id = simgrid::s4u::this_actor::get_pid();
718
719   TRACE_smpi_init(my_proc_id);
720   TRACE_smpi_computing_init(my_proc_id);
721   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
722   TRACE_smpi_comm_out(my_proc_id);
723   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
724   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
725   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
726   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
727   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
728   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
732   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
733   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
734   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
735   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
736   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
737   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
738   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
739   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
740   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
741   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
742   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
743   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
744   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
745   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
746   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
747   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
748   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
749
750   //if we have a delayed start, sleep here.
751   if (start_delay_flops > 0) {
752     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
753     smpi_execute_flops(start_delay_flops);
754   } else {
755     // Wait for the other actors to initialize also
756     simgrid::s4u::this_actor::yield();
757   }
758 }
759
760 /** @brief actually run the replay after initialization */
761 void smpi_replay_main(int rank, const char* trace_filename)
762 {
763   static int active_processes = 0;
764   active_processes++;
765   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
766   std::string rank_string                      = std::to_string(rank);
767   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
768
769   /* and now, finalize everything */
770   /* One active process will stop. Decrease the counter*/
771   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
772   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
773   if (count_requests > 0) {
774     MPI_Request requests[count_requests];
775     MPI_Status status[count_requests];
776     unsigned int i=0;
777
778     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
779       requests[i] = pair.second;
780       i++;
781     }
782     simgrid::smpi::Request::waitall(count_requests, requests, status);
783   }
784   active_processes--;
785
786   if(active_processes==0){
787     /* Last process alive speaking: end the simulated timer */
788     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
789     smpi_free_replay_tmp_buffers();
790   }
791
792   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
793                      new simgrid::instr::NoOpTIData("finalize"));
794
795   smpi_process()->finalize();
796
797   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
798   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
799 }
800
801 /** @brief chain a replay initialization and a replay start */
802 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
803 {
804   smpi_replay_init(instance_id, rank, start_delay_flops);
805   smpi_replay_main(rank, trace_filename);
806 }