Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Enable usage of tags
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <sstream>
20 #include <vector>
21
22 using simgrid::s4u::Actor;
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2)) {                                                     \
33       std::stringstream ss; \
34       for (const auto& elem : action) { \
35         ss << elem << " "; \
36       } \
37       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
38                            "%zu items were given on the line. First two should be process_id and action.  "            \
39                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
40                            "The full line that was given is:\n   %s\n" \
41                            "Please contact the Simgrid team if support is needed",                                     \
42              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), ss.str().c_str());    \
43     }\
44   }
45
46 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
47 {
48   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
49     std::string s = boost::algorithm::join(action, " ");
50     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
51   }
52 }
53
54 static std::vector<MPI_Request>* get_reqq_self()
55 {
56   return reqq.at(simgrid::s4u::this_actor::getPid());
57 }
58
59 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
60 {
61   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
62 }
63
64 /* Helper function */
65 static double parse_double(std::string string)
66 {
67   return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 }
69
70 namespace simgrid {
71 namespace smpi {
72
73 namespace replay {
74 class ActionArgParser {
75 public:
76   virtual ~ActionArgParser() = default;
77   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
78 };
79
80 class SendRecvParser : public ActionArgParser {
81 public:
82   /* communication partner; if we send, this is the receiver and vice versa */
83   int partner;
84   double size;
85   int tag;
86   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
87
88   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
89   {
90     CHECK_ACTION_PARAMS(action, 3, 1)
91     partner = std::stoi(action[2]);
92     tag     = std::stoi(action[3]);
93     size    = parse_double(action[4]);
94     if (action.size() > 5)
95       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
96   }
97 };
98
99 class ComputeParser : public ActionArgParser {
100 public:
101   /* communication partner; if we send, this is the receiver and vice versa */
102   double flops;
103
104   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
105   {
106     CHECK_ACTION_PARAMS(action, 1, 0)
107     flops = parse_double(action[2]);
108   }
109 };
110
111 class CollCommParser : public ActionArgParser {
112 public:
113   double size;
114   double comm_size;
115   double comp_size;
116   int send_size;
117   int recv_size;
118   int root = 0;
119   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
120   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
121 };
122
123 class BcastArgParser : public CollCommParser {
124 public:
125   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
126   {
127     CHECK_ACTION_PARAMS(action, 1, 2)
128     size = parse_double(action[2]);
129     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
130     if (action.size() > 4)
131       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
132   }
133 };
134
135 class ReduceArgParser : public CollCommParser {
136 public:
137   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
138   {
139     CHECK_ACTION_PARAMS(action, 2, 2)
140     comm_size = parse_double(action[2]);
141     comp_size = parse_double(action[3]);
142     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
143     if (action.size() > 5)
144       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
145   }
146 };
147
148 class AllReduceArgParser : public CollCommParser {
149 public:
150   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
151   {
152     CHECK_ACTION_PARAMS(action, 2, 1)
153     comm_size = parse_double(action[2]);
154     comp_size = parse_double(action[3]);
155     if (action.size() > 4)
156       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
157   }
158 };
159
160 class AllToAllArgParser : public CollCommParser {
161 public:
162   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
163   {
164     CHECK_ACTION_PARAMS(action, 2, 1)
165     comm_size = MPI_COMM_WORLD->size();
166     send_size = parse_double(action[2]);
167     recv_size = parse_double(action[3]);
168
169     if (action.size() > 4)
170       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
171     if (action.size() > 5)
172       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
173   }
174 };
175
176 class GatherArgParser : public CollCommParser {
177 public:
178   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
179   {
180     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
181           0 gather 68 68 0 0 0
182         where:
183           1) 68 is the sendcounts
184           2) 68 is the recvcounts
185           3) 0 is the root node
186           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
187           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
188     */
189     CHECK_ACTION_PARAMS(action, 2, 3)
190     comm_size = MPI_COMM_WORLD->size();
191     send_size = parse_double(action[2]);
192     recv_size = parse_double(action[3]);
193
194     if (name == "gather") {
195       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196       if (action.size() > 5)
197         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198       if (action.size() > 6)
199         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
200     }
201     else {
202       if (action.size() > 4)
203         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204       if (action.size() > 5)
205         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
206     }
207   }
208 };
209
210 class GatherVArgParser : public CollCommParser {
211 public:
212   int recv_size_sum;
213   std::shared_ptr<std::vector<int>> recvcounts;
214   std::vector<int> disps;
215   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
216   {
217     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
218          0 gather 68 68 10 10 10 0 0 0
219        where:
220          1) 68 is the sendcount
221          2) 68 10 10 10 is the recvcounts
222          3) 0 is the root node
223          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
224          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
225     */
226     comm_size = MPI_COMM_WORLD->size();
227     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
228     send_size = parse_double(action[2]);
229     disps     = std::vector<int>(comm_size, 0);
230     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
231
232     if (name == "gatherV") {
233       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
234       if (action.size() > 4 + comm_size)
235         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
236       if (action.size() > 5 + comm_size)
237         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
238     }
239     else {
240       int datatype_index = 0;
241       int disp_index     = 0;
242       /* The 3 comes from "0 gather <sendcount>", which must always be present.
243        * The + comm_size is the recvcounts array, which must also be present
244        */
245       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
246         datatype_index = 3 + comm_size;
247         disp_index     = datatype_index + 1;
248         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
249         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
250       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
251         disp_index     = 3 + comm_size;
252       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
253         datatype_index = 3 + comm_size;
254         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
255         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
256       }
257
258       if (disp_index != 0) {
259         for (unsigned int i = 0; i < comm_size; i++)
260           disps[i]          = std::stoi(action[disp_index + i]);
261       }
262     }
263
264     for (unsigned int i = 0; i < comm_size; i++) {
265       (*recvcounts)[i] = std::stoi(action[i + 3]);
266     }
267     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
268   }
269 };
270
271 class ScatterArgParser : public CollCommParser {
272 public:
273   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
274   {
275     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
276           0 gather 68 68 0 0 0
277         where:
278           1) 68 is the sendcounts
279           2) 68 is the recvcounts
280           3) 0 is the root node
281           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
282           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
283     */
284     CHECK_ACTION_PARAMS(action, 2, 3)
285     comm_size   = MPI_COMM_WORLD->size();
286     send_size   = parse_double(action[2]);
287     recv_size   = parse_double(action[3]);
288     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
289     if (action.size() > 5)
290       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
291     if (action.size() > 6)
292       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
293   }
294 };
295
296 class ScatterVArgParser : public CollCommParser {
297 public:
298   int recv_size_sum;
299   int send_size_sum;
300   std::shared_ptr<std::vector<int>> sendcounts;
301   std::vector<int> disps;
302   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
303   {
304     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
305        0 gather 68 10 10 10 68 0 0 0
306         where:
307         1) 68 10 10 10 is the sendcounts
308         2) 68 is the recvcount
309         3) 0 is the root node
310         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
311         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
312     */
313     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
314     recv_size  = parse_double(action[2 + comm_size]);
315     disps      = std::vector<int>(comm_size, 0);
316     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
317
318     if (action.size() > 5 + comm_size)
319       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
320     if (action.size() > 5 + comm_size)
321       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
322
323     for (unsigned int i = 0; i < comm_size; i++) {
324       (*sendcounts)[i] = std::stoi(action[i + 2]);
325     }
326     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
327     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
328   }
329 };
330
331 class ReduceScatterArgParser : public CollCommParser {
332 public:
333   int recv_size_sum;
334   std::shared_ptr<std::vector<int>> recvcounts;
335   std::vector<int> disps;
336   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
337   {
338     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
339          0 reduceScatter 275427 275427 275427 204020 11346849 0
340        where:
341          1) The first four values after the name of the action declare the recvcounts array
342          2) The value 11346849 is the amount of instructions
343          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
344     */
345     comm_size = MPI_COMM_WORLD->size();
346     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
347     comp_size = parse_double(action[2+comm_size]);
348     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
349     if (action.size() > 3 + comm_size)
350       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
351
352     for (unsigned int i = 0; i < comm_size; i++) {
353       recvcounts->push_back(std::stoi(action[i + 2]));
354     }
355     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
356   }
357 };
358
359 class AllToAllVArgParser : public CollCommParser {
360 public:
361   int recv_size_sum;
362   int send_size_sum;
363   std::shared_ptr<std::vector<int>> recvcounts;
364   std::shared_ptr<std::vector<int>> sendcounts;
365   std::vector<int> senddisps;
366   std::vector<int> recvdisps;
367   int send_buf_size;
368   int recv_buf_size;
369   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
370   {
371     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
372           0 allToAllV 100 1 7 10 12 100 1 70 10 5
373        where:
374         1) 100 is the size of the send buffer *sizeof(int),
375         2) 1 7 10 12 is the sendcounts array
376         3) 100*sizeof(int) is the size of the receiver buffer
377         4)  1 70 10 5 is the recvcounts array
378     */
379     comm_size = MPI_COMM_WORLD->size();
380     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
381     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
382     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
383     senddisps  = std::vector<int>(comm_size, 0);
384     recvdisps  = std::vector<int>(comm_size, 0);
385
386     if (action.size() > 5 + 2 * comm_size)
387       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
388     if (action.size() > 5 + 2 * comm_size)
389       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
390
391     send_buf_size=parse_double(action[2]);
392     recv_buf_size=parse_double(action[3+comm_size]);
393     for (unsigned int i = 0; i < comm_size; i++) {
394       (*sendcounts)[i] = std::stoi(action[3 + i]);
395       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
396     }
397     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
398     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
399   }
400 };
401
402 template <class T> class ReplayAction {
403 protected:
404   const std::string name;
405   const int my_proc_id;
406   T args;
407
408 public:
409   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
410   virtual ~ReplayAction() = default;
411
412   virtual void execute(simgrid::xbt::ReplayAction& action)
413   {
414     // Needs to be re-initialized for every action, hence here
415     double start_time = smpi_process()->simulated_elapsed();
416     args.parse(action, name);
417     kernel(action);
418     if (name != "Init")
419       log_timed_action(action, start_time);
420   }
421
422   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
423
424   void* send_buffer(int size)
425   {
426     return smpi_get_tmp_sendbuffer(size);
427   }
428
429   void* recv_buffer(int size)
430   {
431     return smpi_get_tmp_recvbuffer(size);
432   }
433 };
434
435 class WaitAction : public ReplayAction<ActionArgParser> {
436 public:
437   WaitAction() : ReplayAction("Wait") {}
438   void kernel(simgrid::xbt::ReplayAction& action) override
439   {
440     std::string s = boost::algorithm::join(action, " ");
441     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
442     MPI_Request request = get_reqq_self()->back();
443     get_reqq_self()->pop_back();
444
445     if (request == nullptr) {
446       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
447        * return.*/
448       return;
449     }
450
451     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
452
453     // Must be taken before Request::wait() since the request may be set to
454     // MPI_REQUEST_NULL by Request::wait!
455     int src                  = request->comm()->group()->rank(request->src());
456     int dst                  = request->comm()->group()->rank(request->dst());
457     int tag                  = request->tag();
458     bool is_wait_for_receive = (request->flags() & RECV);
459     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
460     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
461
462     MPI_Status status;
463     Request::wait(&request, &status);
464
465     TRACE_smpi_comm_out(rank);
466     if (is_wait_for_receive)
467       TRACE_smpi_recv(src, dst, tag);
468   }
469 };
470
471 class SendAction : public ReplayAction<SendRecvParser> {
472 public:
473   SendAction() = delete;
474   explicit SendAction(std::string name) : ReplayAction(name) {}
475   void kernel(simgrid::xbt::ReplayAction& action) override
476   {
477     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
478
479     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
480                                                                              args.tag, Datatype::encode(args.datatype1)));
481     if (not TRACE_smpi_view_internals())
482       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
483
484     if (name == "send") {
485       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
486     } else if (name == "Isend") {
487       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
488       get_reqq_self()->push_back(request);
489     } else {
490       xbt_die("Don't know this action, %s", name.c_str());
491     }
492
493     TRACE_smpi_comm_out(my_proc_id);
494   }
495 };
496
497 class RecvAction : public ReplayAction<SendRecvParser> {
498 public:
499   RecvAction() = delete;
500   explicit RecvAction(std::string name) : ReplayAction(name) {}
501   void kernel(simgrid::xbt::ReplayAction& action) override
502   {
503     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
504
505     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
506                                                                              args.tag, Datatype::encode(args.datatype1)));
507
508     MPI_Status status;
509     // unknown size from the receiver point of view
510     if (args.size <= 0.0) {
511       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
512       args.size = status.count;
513     }
514
515     if (name == "recv") {
516       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
517     } else if (name == "Irecv") {
518       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
519       get_reqq_self()->push_back(request);
520     }
521
522     TRACE_smpi_comm_out(my_proc_id);
523     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
524     if (name == "recv" && not TRACE_smpi_view_internals()) {
525       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
526     }
527   }
528 };
529
530 class ComputeAction : public ReplayAction<ComputeParser> {
531 public:
532   ComputeAction() : ReplayAction("compute") {}
533   void kernel(simgrid::xbt::ReplayAction& action) override
534   {
535     TRACE_smpi_computing_in(my_proc_id, args.flops);
536     smpi_execute_flops(args.flops);
537     TRACE_smpi_computing_out(my_proc_id);
538   }
539 };
540
541 class TestAction : public ReplayAction<ActionArgParser> {
542 public:
543   TestAction() : ReplayAction("Test") {}
544   void kernel(simgrid::xbt::ReplayAction& action) override
545   {
546     MPI_Request request = get_reqq_self()->back();
547     get_reqq_self()->pop_back();
548     // if request is null here, this may mean that a previous test has succeeded
549     // Different times in traced application and replayed version may lead to this
550     // In this case, ignore the extra calls.
551     if (request != nullptr) {
552       TRACE_smpi_testing_in(my_proc_id);
553
554       MPI_Status status;
555       int flag = Request::test(&request, &status);
556
557       XBT_DEBUG("MPI_Test result: %d", flag);
558       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
559        * nullptr.*/
560       get_reqq_self()->push_back(request);
561
562       TRACE_smpi_testing_out(my_proc_id);
563     }
564   }
565 };
566
567 class InitAction : public ReplayAction<ActionArgParser> {
568 public:
569   InitAction() : ReplayAction("Init") {}
570   void kernel(simgrid::xbt::ReplayAction& action) override
571   {
572     CHECK_ACTION_PARAMS(action, 0, 1)
573     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
574                                            : MPI_BYTE;  // default TAU datatype
575
576     /* start a simulated timer */
577     smpi_process()->simulated_start();
578     set_reqq_self(new std::vector<MPI_Request>);
579   }
580 };
581
582 class CommunicatorAction : public ReplayAction<ActionArgParser> {
583 public:
584   CommunicatorAction() : ReplayAction("Comm") {}
585   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
586 };
587
588 class WaitAllAction : public ReplayAction<ActionArgParser> {
589 public:
590   WaitAllAction() : ReplayAction("waitAll") {}
591   void kernel(simgrid::xbt::ReplayAction& action) override
592   {
593     const unsigned int count_requests = get_reqq_self()->size();
594
595     if (count_requests > 0) {
596       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
597       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
598       for (const auto& req : (*get_reqq_self())) {
599         if (req && (req->flags() & RECV)) {
600           sender_receiver.push_back({req->src(), req->dst()});
601         }
602       }
603       MPI_Status status[count_requests];
604       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
605
606       for (auto& pair : sender_receiver) {
607         TRACE_smpi_recv(pair.first, pair.second, 0);
608       }
609       TRACE_smpi_comm_out(my_proc_id);
610     }
611   }
612 };
613
614 class BarrierAction : public ReplayAction<ActionArgParser> {
615 public:
616   BarrierAction() : ReplayAction("barrier") {}
617   void kernel(simgrid::xbt::ReplayAction& action) override
618   {
619     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
620     Colls::barrier(MPI_COMM_WORLD);
621     TRACE_smpi_comm_out(my_proc_id);
622   }
623 };
624
625 class BcastAction : public ReplayAction<BcastArgParser> {
626 public:
627   BcastAction() : ReplayAction("bcast") {}
628   void kernel(simgrid::xbt::ReplayAction& action) override
629   {
630     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
631                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
632                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
633
634     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
635
636     TRACE_smpi_comm_out(my_proc_id);
637   }
638 };
639
640 class ReduceAction : public ReplayAction<ReduceArgParser> {
641 public:
642   ReduceAction() : ReplayAction("reduce") {}
643   void kernel(simgrid::xbt::ReplayAction& action) override
644   {
645     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
646                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
647                                                       args.comp_size, args.comm_size, -1,
648                                                       Datatype::encode(args.datatype1), ""));
649
650     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
651         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
652     smpi_execute_flops(args.comp_size);
653
654     TRACE_smpi_comm_out(my_proc_id);
655   }
656 };
657
658 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
659 public:
660   AllReduceAction() : ReplayAction("allReduce") {}
661   void kernel(simgrid::xbt::ReplayAction& action) override
662   {
663     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
664                                                                                 Datatype::encode(args.datatype1), ""));
665
666     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
667         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
668     smpi_execute_flops(args.comp_size);
669
670     TRACE_smpi_comm_out(my_proc_id);
671   }
672 };
673
674 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
675 public:
676   AllToAllAction() : ReplayAction("allToAll") {}
677   void kernel(simgrid::xbt::ReplayAction& action) override
678   {
679     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
680                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
681                                                     Datatype::encode(args.datatype1),
682                                                     Datatype::encode(args.datatype2)));
683
684     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
685                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
686                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
687
688     TRACE_smpi_comm_out(my_proc_id);
689   }
690 };
691
692 class GatherAction : public ReplayAction<GatherArgParser> {
693 public:
694   explicit GatherAction(std::string name) : ReplayAction(name) {}
695   void kernel(simgrid::xbt::ReplayAction& action) override
696   {
697     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
698                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
699
700     if (name == "gather") {
701       int rank = MPI_COMM_WORLD->rank();
702       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
703                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
704     }
705     else
706       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
707                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
708
709     TRACE_smpi_comm_out(my_proc_id);
710   }
711 };
712
713 class GatherVAction : public ReplayAction<GatherVArgParser> {
714 public:
715   explicit GatherVAction(std::string name) : ReplayAction(name) {}
716   void kernel(simgrid::xbt::ReplayAction& action) override
717   {
718     int rank = MPI_COMM_WORLD->rank();
719
720     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
721                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
722                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
723
724     if (name == "gatherV") {
725       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
727                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
728     }
729     else {
730       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
731                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
732                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
733     }
734
735     TRACE_smpi_comm_out(my_proc_id);
736   }
737 };
738
739 class ScatterAction : public ReplayAction<ScatterArgParser> {
740 public:
741   ScatterAction() : ReplayAction("scatter") {}
742   void kernel(simgrid::xbt::ReplayAction& action) override
743   {
744     int rank = MPI_COMM_WORLD->rank();
745     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
746                                                                           Datatype::encode(args.datatype1),
747                                                                           Datatype::encode(args.datatype2)));
748
749     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
750                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
751
752     TRACE_smpi_comm_out(my_proc_id);
753   }
754 };
755
756
757 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
758 public:
759   ScatterVAction() : ReplayAction("scatterV") {}
760   void kernel(simgrid::xbt::ReplayAction& action) override
761   {
762     int rank = MPI_COMM_WORLD->rank();
763     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
764           nullptr, Datatype::encode(args.datatype1),
765           Datatype::encode(args.datatype2)));
766
767     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
768                     args.sendcounts->data(), args.disps.data(), args.datatype1,
769                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
770                     MPI_COMM_WORLD);
771
772     TRACE_smpi_comm_out(my_proc_id);
773   }
774 };
775
776 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
777 public:
778   ReduceScatterAction() : ReplayAction("reduceScatter") {}
779   void kernel(simgrid::xbt::ReplayAction& action) override
780   {
781     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
782                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
783                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
784                                                          Datatype::encode(args.datatype1)));
785
786     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
787                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
788                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
789
790     smpi_execute_flops(args.comp_size);
791     TRACE_smpi_comm_out(my_proc_id);
792   }
793 };
794
795 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
796 public:
797   AllToAllVAction() : ReplayAction("allToAllV") {}
798   void kernel(simgrid::xbt::ReplayAction& action) override
799   {
800     TRACE_smpi_comm_in(my_proc_id, __func__,
801                        new simgrid::instr::VarCollTIData(
802                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
803                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
804
805     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
806                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
807
808     TRACE_smpi_comm_out(my_proc_id);
809   }
810 };
811 } // Replay Namespace
812 }} // namespace simgrid::smpi
813
814 /** @brief Only initialize the replay, don't do it for real */
815 void smpi_replay_init(int* argc, char*** argv)
816 {
817   simgrid::smpi::Process::init(argc, argv);
818   smpi_process()->mark_as_initialized();
819   smpi_process()->set_replaying(true);
820
821   int my_proc_id = simgrid::s4u::this_actor::getPid();
822   TRACE_smpi_init(my_proc_id);
823   TRACE_smpi_computing_init(my_proc_id);
824   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
825   TRACE_smpi_comm_out(my_proc_id);
826   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
827   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
828   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
829   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
830   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
831
832   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
833   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
834   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
835   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
836   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
837   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
838   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
839   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
840   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
841   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
842   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
843   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
844   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
845   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
846   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
847   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
848   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
849   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
850   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
851   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
852   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
853
854   //if we have a delayed start, sleep here.
855   if(*argc>2){
856     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
857     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
858     smpi_execute_flops(value);
859   } else {
860     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
861     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
862     smpi_execute_flops(0.0);
863   }
864 }
865
866 /** @brief actually run the replay after initialization */
867 void smpi_replay_main(int* argc, char*** argv)
868 {
869   static int active_processes = 0;
870   active_processes++;
871   simgrid::xbt::replay_runner(*argc, *argv);
872
873   /* and now, finalize everything */
874   /* One active process will stop. Decrease the counter*/
875   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
876   if (not get_reqq_self()->empty()) {
877     unsigned int count_requests=get_reqq_self()->size();
878     MPI_Request requests[count_requests];
879     MPI_Status status[count_requests];
880     unsigned int i=0;
881
882     for (auto const& req : *get_reqq_self()) {
883       requests[i] = req;
884       i++;
885     }
886     simgrid::smpi::Request::waitall(count_requests, requests, status);
887   }
888   delete get_reqq_self();
889   active_processes--;
890
891   if(active_processes==0){
892     /* Last process alive speaking: end the simulated timer */
893     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894     smpi_free_replay_tmp_buffers();
895   }
896
897   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
898                      new simgrid::instr::NoOpTIData("finalize"));
899
900   smpi_process()->finalize();
901
902   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
903   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
904 }
905
906 /** @brief chain a replay initialization and a replay start */
907 void smpi_replay_run(int* argc, char*** argv)
908 {
909   smpi_replay_init(argc, argv);
910   smpi_replay_main(argc, argv);
911 }