Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
various sonar fixes
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%zu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
38   }
39
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 {
42   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43     std::string s = boost::algorithm::join(action, " ");
44     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
45   }
46 }
47
48 static std::vector<MPI_Request>* get_reqq_self()
49 {
50   return reqq.at(simgrid::s4u::this_actor::getPid());
51 }
52
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 {
55   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
56 }
57
58 /* Helper function */
59 static double parse_double(std::string string)
60 {
61   return xbt_str_parse_double(string.c_str(), "%s is not a double");
62 }
63
64 namespace simgrid {
65 namespace smpi {
66
67 namespace replay {
68 class ActionArgParser {
69 public:
70   virtual ~ActionArgParser() = default;
71   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherArgParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185
186     if (name == "gather") {
187       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188       if (action.size() > 5)
189         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190       if (action.size() > 6)
191         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
192     }
193     else {
194       if (action.size() > 4)
195         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196       if (action.size() > 5)
197         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
198     }
199   }
200 };
201
202 class GatherVArgParser : public CollCommParser {
203 public:
204   int recv_size_sum;
205   std::shared_ptr<std::vector<int>> recvcounts;
206   std::vector<int> disps;
207   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208   {
209     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210          0 gather 68 68 10 10 10 0 0 0
211        where:
212          1) 68 is the sendcount
213          2) 68 10 10 10 is the recvcounts
214          3) 0 is the root node
215          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217     */
218     comm_size = MPI_COMM_WORLD->size();
219     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220     send_size = parse_double(action[2]);
221     disps     = std::vector<int>(comm_size, 0);
222     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223
224     if (name == "gatherV") {
225       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226       if (action.size() > 4 + comm_size)
227         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228       if (action.size() > 5 + comm_size)
229         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
230     }
231     else {
232       int datatype_index = 0;
233       int disp_index     = 0;
234       /* The 3 comes from "0 gather <sendcount>", which must always be present.
235        * The + comm_size is the recvcounts array, which must also be present
236        */
237       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238         datatype_index = 3 + comm_size;
239         disp_index     = datatype_index + 1;
240         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
241         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
242       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243         disp_index     = 3 + comm_size;
244       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
245         datatype_index = 3 + comm_size;
246         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
247         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
248       }
249
250       if (disp_index != 0) {
251         for (unsigned int i = 0; i < comm_size; i++)
252           disps[i]          = std::stoi(action[disp_index + i]);
253       }
254     }
255
256     for (unsigned int i = 0; i < comm_size; i++) {
257       (*recvcounts)[i] = std::stoi(action[i + 3]);
258     }
259     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
260   }
261 };
262
263 class ScatterArgParser : public CollCommParser {
264 public:
265   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
266   {
267     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
268           0 gather 68 68 0 0 0
269         where:
270           1) 68 is the sendcounts
271           2) 68 is the recvcounts
272           3) 0 is the root node
273           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275     */
276     CHECK_ACTION_PARAMS(action, 2, 3)
277     comm_size   = MPI_COMM_WORLD->size();
278     send_size   = parse_double(action[2]);
279     recv_size   = parse_double(action[3]);
280     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
281     if (action.size() > 5)
282       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283     if (action.size() > 6)
284       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
285   }
286 };
287
288 class ScatterVArgParser : public CollCommParser {
289 public:
290   int recv_size_sum;
291   int send_size_sum;
292   std::shared_ptr<std::vector<int>> sendcounts;
293   std::vector<int> disps;
294   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
295   {
296     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297        0 gather 68 10 10 10 68 0 0 0
298         where:
299         1) 68 10 10 10 is the sendcounts
300         2) 68 is the recvcount
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304     */
305     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306     recv_size  = parse_double(action[2 + comm_size]);
307     disps      = std::vector<int>(comm_size, 0);
308     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
309
310     if (action.size() > 5 + comm_size)
311       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312     if (action.size() > 5 + comm_size)
313       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
314
315     for (unsigned int i = 0; i < comm_size; i++) {
316       (*sendcounts)[i] = std::stoi(action[i + 2]);
317     }
318     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
320   }
321 };
322
323 class ReduceScatterArgParser : public CollCommParser {
324 public:
325   int recv_size_sum;
326   std::shared_ptr<std::vector<int>> recvcounts;
327   std::vector<int> disps;
328   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
329   {
330     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331          0 reduceScatter 275427 275427 275427 204020 11346849 0
332        where:
333          1) The first four values after the name of the action declare the recvcounts array
334          2) The value 11346849 is the amount of instructions
335          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
336     */
337     comm_size = MPI_COMM_WORLD->size();
338     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339     comp_size = parse_double(action[2+comm_size]);
340     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341     if (action.size() > 3 + comm_size)
342       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
343
344     for (unsigned int i = 0; i < comm_size; i++) {
345       recvcounts->push_back(std::stoi(action[i + 2]));
346     }
347     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
348   }
349 };
350
351 class AllToAllVArgParser : public CollCommParser {
352 public:
353   int recv_size_sum;
354   int send_size_sum;
355   std::shared_ptr<std::vector<int>> recvcounts;
356   std::shared_ptr<std::vector<int>> sendcounts;
357   std::vector<int> senddisps;
358   std::vector<int> recvdisps;
359   int send_buf_size;
360   int recv_buf_size;
361   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
362   {
363     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364           0 allToAllV 100 1 7 10 12 100 1 70 10 5
365        where:
366         1) 100 is the size of the send buffer *sizeof(int),
367         2) 1 7 10 12 is the sendcounts array
368         3) 100*sizeof(int) is the size of the receiver buffer
369         4)  1 70 10 5 is the recvcounts array
370     */
371     comm_size = MPI_COMM_WORLD->size();
372     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375     senddisps  = std::vector<int>(comm_size, 0);
376     recvdisps  = std::vector<int>(comm_size, 0);
377
378     if (action.size() > 5 + 2 * comm_size)
379       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380     if (action.size() > 5 + 2 * comm_size)
381       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
382
383     send_buf_size=parse_double(action[2]);
384     recv_buf_size=parse_double(action[3+comm_size]);
385     for (unsigned int i = 0; i < comm_size; i++) {
386       (*sendcounts)[i] = std::stoi(action[3 + i]);
387       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
388     }
389     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
391   }
392 };
393
394 template <class T> class ReplayAction {
395 protected:
396   const std::string name;
397   T args;
398
399   int my_proc_id;
400
401 public:
402   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
403   virtual ~ReplayAction() = default;
404
405   virtual void execute(simgrid::xbt::ReplayAction& action)
406   {
407     // Needs to be re-initialized for every action, hence here
408     double start_time = smpi_process()->simulated_elapsed();
409     args.parse(action, name);
410     kernel(action);
411     if (name != "Init")
412       log_timed_action(action, start_time);
413   }
414
415   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
416
417   void* send_buffer(int size)
418   {
419     return smpi_get_tmp_sendbuffer(size);
420   }
421
422   void* recv_buffer(int size)
423   {
424     return smpi_get_tmp_recvbuffer(size);
425   }
426 };
427
428 class WaitAction : public ReplayAction<ActionArgParser> {
429 public:
430   WaitAction() : ReplayAction("Wait") {}
431   void kernel(simgrid::xbt::ReplayAction& action) override
432   {
433     std::string s = boost::algorithm::join(action, " ");
434     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
435     MPI_Request request = get_reqq_self()->back();
436     get_reqq_self()->pop_back();
437
438     if (request == nullptr) {
439       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
440        * return.*/
441       return;
442     }
443
444     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
445
446     // Must be taken before Request::wait() since the request may be set to
447     // MPI_REQUEST_NULL by Request::wait!
448     int src                  = request->comm()->group()->rank(request->src());
449     int dst                  = request->comm()->group()->rank(request->dst());
450     bool is_wait_for_receive = (request->flags() & RECV);
451     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
452     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
453
454     MPI_Status status;
455     Request::wait(&request, &status);
456
457     TRACE_smpi_comm_out(rank);
458     if (is_wait_for_receive)
459       TRACE_smpi_recv(src, dst, 0);
460   }
461 };
462
463 class SendAction : public ReplayAction<SendRecvParser> {
464 public:
465   SendAction() = delete;
466   explicit SendAction(std::string name) : ReplayAction(name) {}
467   void kernel(simgrid::xbt::ReplayAction& action) override
468   {
469     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
470
471     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
472                                                                              Datatype::encode(args.datatype1)));
473     if (not TRACE_smpi_view_internals())
474       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
475
476     if (name == "send") {
477       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
478     } else if (name == "Isend") {
479       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
480       get_reqq_self()->push_back(request);
481     } else {
482       xbt_die("Don't know this action, %s", name.c_str());
483     }
484
485     TRACE_smpi_comm_out(my_proc_id);
486   }
487 };
488
489 class RecvAction : public ReplayAction<SendRecvParser> {
490 public:
491   RecvAction() = delete;
492   explicit RecvAction(std::string name) : ReplayAction(name) {}
493   void kernel(simgrid::xbt::ReplayAction& action) override
494   {
495     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
496
497     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
498                                                                              Datatype::encode(args.datatype1)));
499
500     MPI_Status status;
501     // unknown size from the receiver point of view
502     if (args.size <= 0.0) {
503       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
504       args.size = status.count;
505     }
506
507     if (name == "recv") {
508       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
509     } else if (name == "Irecv") {
510       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
511       get_reqq_self()->push_back(request);
512     }
513
514     TRACE_smpi_comm_out(my_proc_id);
515     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
516     if (name == "recv" && not TRACE_smpi_view_internals()) {
517       TRACE_smpi_recv(src_traced, my_proc_id, 0);
518     }
519   }
520 };
521
522 class ComputeAction : public ReplayAction<ComputeParser> {
523 public:
524   ComputeAction() : ReplayAction("compute") {}
525   void kernel(simgrid::xbt::ReplayAction& action) override
526   {
527     TRACE_smpi_computing_in(my_proc_id, args.flops);
528     smpi_execute_flops(args.flops);
529     TRACE_smpi_computing_out(my_proc_id);
530   }
531 };
532
533 class TestAction : public ReplayAction<ActionArgParser> {
534 public:
535   TestAction() : ReplayAction("Test") {}
536   void kernel(simgrid::xbt::ReplayAction& action) override
537   {
538     MPI_Request request = get_reqq_self()->back();
539     get_reqq_self()->pop_back();
540     // if request is null here, this may mean that a previous test has succeeded
541     // Different times in traced application and replayed version may lead to this
542     // In this case, ignore the extra calls.
543     if (request != nullptr) {
544       TRACE_smpi_testing_in(my_proc_id);
545
546       MPI_Status status;
547       int flag = Request::test(&request, &status);
548
549       XBT_DEBUG("MPI_Test result: %d", flag);
550       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
551        * nullptr.*/
552       get_reqq_self()->push_back(request);
553
554       TRACE_smpi_testing_out(my_proc_id);
555     }
556   }
557 };
558
559 class InitAction : public ReplayAction<ActionArgParser> {
560 public:
561   InitAction() : ReplayAction("Init") {}
562   void kernel(simgrid::xbt::ReplayAction& action) override
563   {
564     CHECK_ACTION_PARAMS(action, 0, 1)
565     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
566                                            : MPI_BYTE;  // default TAU datatype
567
568     /* start a simulated timer */
569     smpi_process()->simulated_start();
570     /*initialize the number of active processes */
571     active_processes = smpi_process_count();
572
573     set_reqq_self(new std::vector<MPI_Request>);
574   }
575 };
576
577 class CommunicatorAction : public ReplayAction<ActionArgParser> {
578 public:
579   CommunicatorAction() : ReplayAction("Comm") {}
580   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
581 };
582
583 class WaitAllAction : public ReplayAction<ActionArgParser> {
584 public:
585   WaitAllAction() : ReplayAction("waitAll") {}
586   void kernel(simgrid::xbt::ReplayAction& action) override
587   {
588     const unsigned int count_requests = get_reqq_self()->size();
589
590     if (count_requests > 0) {
591       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
592       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
593       for (const auto& req : (*get_reqq_self())) {
594         if (req && (req->flags() & RECV)) {
595           sender_receiver.push_back({req->src(), req->dst()});
596         }
597       }
598       MPI_Status status[count_requests];
599       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
600
601       for (auto& pair : sender_receiver) {
602         TRACE_smpi_recv(pair.first, pair.second, 0);
603       }
604       TRACE_smpi_comm_out(my_proc_id);
605     }
606   }
607 };
608
609 class BarrierAction : public ReplayAction<ActionArgParser> {
610 public:
611   BarrierAction() : ReplayAction("barrier") {}
612   void kernel(simgrid::xbt::ReplayAction& action) override
613   {
614     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
615     Colls::barrier(MPI_COMM_WORLD);
616     TRACE_smpi_comm_out(my_proc_id);
617   }
618 };
619
620 class BcastAction : public ReplayAction<BcastArgParser> {
621 public:
622   BcastAction() : ReplayAction("bcast") {}
623   void kernel(simgrid::xbt::ReplayAction& action) override
624   {
625     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
626                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
627                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
628
629     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
630
631     TRACE_smpi_comm_out(my_proc_id);
632   }
633 };
634
635 class ReduceAction : public ReplayAction<ReduceArgParser> {
636 public:
637   ReduceAction() : ReplayAction("reduce") {}
638   void kernel(simgrid::xbt::ReplayAction& action) override
639   {
640     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
641                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
642                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
643
644     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
645         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
646     smpi_execute_flops(args.comp_size);
647
648     TRACE_smpi_comm_out(my_proc_id);
649   }
650 };
651
652 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
653 public:
654   AllReduceAction() : ReplayAction("allReduce") {}
655   void kernel(simgrid::xbt::ReplayAction& action) override
656   {
657     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
658                                                                                 Datatype::encode(args.datatype1), ""));
659
660     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
661         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
662     smpi_execute_flops(args.comp_size);
663
664     TRACE_smpi_comm_out(my_proc_id);
665   }
666 };
667
668 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
669 public:
670   AllToAllAction() : ReplayAction("allToAll") {}
671   void kernel(simgrid::xbt::ReplayAction& action) override
672   {
673     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
674                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
675                                                     Datatype::encode(args.datatype1),
676                                                     Datatype::encode(args.datatype2)));
677
678     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
679       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
680       args.recv_size, args.datatype2, MPI_COMM_WORLD);
681
682     TRACE_smpi_comm_out(my_proc_id);
683   }
684 };
685
686 class GatherAction : public ReplayAction<GatherArgParser> {
687 public:
688   explicit GatherAction(std::string name) : ReplayAction(name) {}
689   void kernel(simgrid::xbt::ReplayAction& action) override
690   {
691     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
692                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
693
694     if (name == "gather") {
695       int rank = MPI_COMM_WORLD->rank();
696       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
698     }
699     else
700       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
702
703     TRACE_smpi_comm_out(my_proc_id);
704   }
705 };
706
707 class GatherVAction : public ReplayAction<GatherVArgParser> {
708 public:
709   explicit GatherVAction(std::string name) : ReplayAction(name) {}
710   void kernel(simgrid::xbt::ReplayAction& action) override
711   {
712     int rank = MPI_COMM_WORLD->rank();
713
714     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
715                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
716                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
717
718     if (name == "gatherV") {
719       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
720                      (rank == args.root) ? recv_buffer(args.recv_size_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
721                      MPI_COMM_WORLD);
722     }
723     else {
724       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
725                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
726                     MPI_COMM_WORLD);
727     }
728
729     TRACE_smpi_comm_out(my_proc_id);
730   }
731 };
732
733 class ScatterAction : public ReplayAction<ScatterArgParser> {
734 public:
735   ScatterAction() : ReplayAction("scatter") {}
736   void kernel(simgrid::xbt::ReplayAction& action) override
737   {
738     int rank = MPI_COMM_WORLD->rank();
739     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
740                                                                           Datatype::encode(args.datatype1),
741                                                                           Datatype::encode(args.datatype2)));
742
743     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
744                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
745
746     TRACE_smpi_comm_out(my_proc_id);
747   }
748 };
749
750
751 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
752 public:
753   ScatterVAction() : ReplayAction("scatterV") {}
754   void kernel(simgrid::xbt::ReplayAction& action) override
755   {
756     int rank = MPI_COMM_WORLD->rank();
757     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
758           nullptr, Datatype::encode(args.datatype1),
759           Datatype::encode(args.datatype2)));
760
761     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(), 
762         args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
763         MPI_COMM_WORLD);
764
765     TRACE_smpi_comm_out(my_proc_id);
766   }
767 };
768
769 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
770 public:
771   ReduceScatterAction() : ReplayAction("reduceScatter") {}
772   void kernel(simgrid::xbt::ReplayAction& action) override
773   {
774     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
775                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
776                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
777                                                          Datatype::encode(args.datatype1)));
778
779     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()), 
780                           args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
781
782     smpi_execute_flops(args.comp_size);
783     TRACE_smpi_comm_out(my_proc_id);
784   }
785 };
786
787 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
788 public:
789   AllToAllVAction() : ReplayAction("allToAllV") {}
790   void kernel(simgrid::xbt::ReplayAction& action) override
791   {
792     TRACE_smpi_comm_in(my_proc_id, __func__,
793                        new simgrid::instr::VarCollTIData(
794                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
795                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
796
797     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
798                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
799
800     TRACE_smpi_comm_out(my_proc_id);
801   }
802 };
803 } // Replay Namespace
804 }} // namespace simgrid::smpi
805
806 /** @brief Only initialize the replay, don't do it for real */
807 void smpi_replay_init(int* argc, char*** argv)
808 {
809   simgrid::smpi::Process::init(argc, argv);
810   smpi_process()->mark_as_initialized();
811   smpi_process()->set_replaying(true);
812
813   int my_proc_id = simgrid::s4u::this_actor::getPid();
814   TRACE_smpi_init(my_proc_id);
815   TRACE_smpi_computing_init(my_proc_id);
816   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
817   TRACE_smpi_comm_out(my_proc_id);
818   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
819   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
820   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823
824   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
825   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
826   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
827   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
828   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
829   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
830   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
831   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
832   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
833   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
834   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
835   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
836   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
837   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
838   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
839   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
840   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
841   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
842   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
843   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
844   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
845
846   //if we have a delayed start, sleep here.
847   if(*argc>2){
848     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
849     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
850     smpi_execute_flops(value);
851   } else {
852     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
853     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
854     smpi_execute_flops(0.0);
855   }
856 }
857
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int* argc, char*** argv)
860 {
861   simgrid::xbt::replay_runner(*argc, *argv);
862
863   /* and now, finalize everything */
864   /* One active process will stop. Decrease the counter*/
865   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
866   if (not get_reqq_self()->empty()) {
867     unsigned int count_requests=get_reqq_self()->size();
868     MPI_Request requests[count_requests];
869     MPI_Status status[count_requests];
870     unsigned int i=0;
871
872     for (auto const& req : *get_reqq_self()) {
873       requests[i] = req;
874       i++;
875     }
876     simgrid::smpi::Request::waitall(count_requests, requests, status);
877   }
878   delete get_reqq_self();
879   active_processes--;
880
881   if(active_processes==0){
882     /* Last process alive speaking: end the simulated timer */
883     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
884     smpi_free_replay_tmp_buffers();
885   }
886
887   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
888                      new simgrid::instr::NoOpTIData("finalize"));
889
890   smpi_process()->finalize();
891
892   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
893   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
894 }
895
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
898 {
899   smpi_replay_init(argc, argv);
900   smpi_replay_main(argc, argv);
901 }