Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Constify attribute.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%zu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
38   }
39
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 {
42   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43     std::string s = boost::algorithm::join(action, " ");
44     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
45   }
46 }
47
48 static std::vector<MPI_Request>* get_reqq_self()
49 {
50   return reqq.at(simgrid::s4u::this_actor::getPid());
51 }
52
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 {
55   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
56 }
57
58 /* Helper function */
59 static double parse_double(std::string string)
60 {
61   return xbt_str_parse_double(string.c_str(), "%s is not a double");
62 }
63
64 namespace simgrid {
65 namespace smpi {
66
67 namespace replay {
68 class ActionArgParser {
69 public:
70   virtual ~ActionArgParser() = default;
71   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherArgParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185
186     if (name == "gather") {
187       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188       if (action.size() > 5)
189         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190       if (action.size() > 6)
191         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
192     }
193     else {
194       if (action.size() > 4)
195         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196       if (action.size() > 5)
197         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
198     }
199   }
200 };
201
202 class GatherVArgParser : public CollCommParser {
203 public:
204   int recv_size_sum;
205   std::shared_ptr<std::vector<int>> recvcounts;
206   std::vector<int> disps;
207   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208   {
209     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210          0 gather 68 68 10 10 10 0 0 0
211        where:
212          1) 68 is the sendcount
213          2) 68 10 10 10 is the recvcounts
214          3) 0 is the root node
215          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217     */
218     comm_size = MPI_COMM_WORLD->size();
219     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220     send_size = parse_double(action[2]);
221     disps     = std::vector<int>(comm_size, 0);
222     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223
224     if (name == "gatherV") {
225       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226       if (action.size() > 4 + comm_size)
227         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228       if (action.size() > 5 + comm_size)
229         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
230     }
231     else {
232       int datatype_index = 0;
233       int disp_index     = 0;
234       /* The 3 comes from "0 gather <sendcount>", which must always be present.
235        * The + comm_size is the recvcounts array, which must also be present
236        */
237       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238         datatype_index = 3 + comm_size;
239         disp_index     = datatype_index + 1;
240         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
241         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
242       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243         disp_index     = 3 + comm_size;
244       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
245         datatype_index = 3 + comm_size;
246         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
247         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
248       }
249
250       if (disp_index != 0) {
251         for (unsigned int i = 0; i < comm_size; i++)
252           disps[i]          = std::stoi(action[disp_index + i]);
253       }
254     }
255
256     for (unsigned int i = 0; i < comm_size; i++) {
257       (*recvcounts)[i] = std::stoi(action[i + 3]);
258     }
259     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
260   }
261 };
262
263 class ScatterArgParser : public CollCommParser {
264 public:
265   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
266   {
267     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
268           0 gather 68 68 0 0 0
269         where:
270           1) 68 is the sendcounts
271           2) 68 is the recvcounts
272           3) 0 is the root node
273           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275     */
276     CHECK_ACTION_PARAMS(action, 2, 3)
277     comm_size   = MPI_COMM_WORLD->size();
278     send_size   = parse_double(action[2]);
279     recv_size   = parse_double(action[3]);
280     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
281     if (action.size() > 5)
282       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283     if (action.size() > 6)
284       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
285   }
286 };
287
288 class ScatterVArgParser : public CollCommParser {
289 public:
290   int recv_size_sum;
291   int send_size_sum;
292   std::shared_ptr<std::vector<int>> sendcounts;
293   std::vector<int> disps;
294   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
295   {
296     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297        0 gather 68 10 10 10 68 0 0 0
298         where:
299         1) 68 10 10 10 is the sendcounts
300         2) 68 is the recvcount
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304     */
305     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306     recv_size  = parse_double(action[2 + comm_size]);
307     disps      = std::vector<int>(comm_size, 0);
308     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
309
310     if (action.size() > 5 + comm_size)
311       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312     if (action.size() > 5 + comm_size)
313       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
314
315     for (unsigned int i = 0; i < comm_size; i++) {
316       (*sendcounts)[i] = std::stoi(action[i + 2]);
317     }
318     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
320   }
321 };
322
323 class ReduceScatterArgParser : public CollCommParser {
324 public:
325   int recv_size_sum;
326   std::shared_ptr<std::vector<int>> recvcounts;
327   std::vector<int> disps;
328   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
329   {
330     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331          0 reduceScatter 275427 275427 275427 204020 11346849 0
332        where:
333          1) The first four values after the name of the action declare the recvcounts array
334          2) The value 11346849 is the amount of instructions
335          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
336     */
337     comm_size = MPI_COMM_WORLD->size();
338     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339     comp_size = parse_double(action[2+comm_size]);
340     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341     if (action.size() > 3 + comm_size)
342       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
343
344     for (unsigned int i = 0; i < comm_size; i++) {
345       recvcounts->push_back(std::stoi(action[i + 2]));
346     }
347     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
348   }
349 };
350
351 class AllToAllVArgParser : public CollCommParser {
352 public:
353   int recv_size_sum;
354   int send_size_sum;
355   std::shared_ptr<std::vector<int>> recvcounts;
356   std::shared_ptr<std::vector<int>> sendcounts;
357   std::vector<int> senddisps;
358   std::vector<int> recvdisps;
359   int send_buf_size;
360   int recv_buf_size;
361   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
362   {
363     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364           0 allToAllV 100 1 7 10 12 100 1 70 10 5
365        where:
366         1) 100 is the size of the send buffer *sizeof(int),
367         2) 1 7 10 12 is the sendcounts array
368         3) 100*sizeof(int) is the size of the receiver buffer
369         4)  1 70 10 5 is the recvcounts array
370     */
371     comm_size = MPI_COMM_WORLD->size();
372     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375     senddisps  = std::vector<int>(comm_size, 0);
376     recvdisps  = std::vector<int>(comm_size, 0);
377
378     if (action.size() > 5 + 2 * comm_size)
379       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380     if (action.size() > 5 + 2 * comm_size)
381       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
382
383     send_buf_size=parse_double(action[2]);
384     recv_buf_size=parse_double(action[3+comm_size]);
385     for (unsigned int i = 0; i < comm_size; i++) {
386       (*sendcounts)[i] = std::stoi(action[3 + i]);
387       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
388     }
389     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
391   }
392 };
393
394 template <class T> class ReplayAction {
395 protected:
396   const std::string name;
397   const int my_proc_id;
398   T args;
399
400 public:
401   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
402   virtual ~ReplayAction() = default;
403
404   virtual void execute(simgrid::xbt::ReplayAction& action)
405   {
406     // Needs to be re-initialized for every action, hence here
407     double start_time = smpi_process()->simulated_elapsed();
408     args.parse(action, name);
409     kernel(action);
410     if (name != "Init")
411       log_timed_action(action, start_time);
412   }
413
414   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
415
416   void* send_buffer(int size)
417   {
418     return smpi_get_tmp_sendbuffer(size);
419   }
420
421   void* recv_buffer(int size)
422   {
423     return smpi_get_tmp_recvbuffer(size);
424   }
425 };
426
427 class WaitAction : public ReplayAction<ActionArgParser> {
428 public:
429   WaitAction() : ReplayAction("Wait") {}
430   void kernel(simgrid::xbt::ReplayAction& action) override
431   {
432     std::string s = boost::algorithm::join(action, " ");
433     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
434     MPI_Request request = get_reqq_self()->back();
435     get_reqq_self()->pop_back();
436
437     if (request == nullptr) {
438       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
439        * return.*/
440       return;
441     }
442
443     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
444
445     // Must be taken before Request::wait() since the request may be set to
446     // MPI_REQUEST_NULL by Request::wait!
447     int src                  = request->comm()->group()->rank(request->src());
448     int dst                  = request->comm()->group()->rank(request->dst());
449     bool is_wait_for_receive = (request->flags() & RECV);
450     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
451     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
452
453     MPI_Status status;
454     Request::wait(&request, &status);
455
456     TRACE_smpi_comm_out(rank);
457     if (is_wait_for_receive)
458       TRACE_smpi_recv(src, dst, 0);
459   }
460 };
461
462 class SendAction : public ReplayAction<SendRecvParser> {
463 public:
464   SendAction() = delete;
465   explicit SendAction(std::string name) : ReplayAction(name) {}
466   void kernel(simgrid::xbt::ReplayAction& action) override
467   {
468     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
469
470     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
471                                                                              Datatype::encode(args.datatype1)));
472     if (not TRACE_smpi_view_internals())
473       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
474
475     if (name == "send") {
476       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477     } else if (name == "Isend") {
478       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
479       get_reqq_self()->push_back(request);
480     } else {
481       xbt_die("Don't know this action, %s", name.c_str());
482     }
483
484     TRACE_smpi_comm_out(my_proc_id);
485   }
486 };
487
488 class RecvAction : public ReplayAction<SendRecvParser> {
489 public:
490   RecvAction() = delete;
491   explicit RecvAction(std::string name) : ReplayAction(name) {}
492   void kernel(simgrid::xbt::ReplayAction& action) override
493   {
494     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
495
496     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
497                                                                              Datatype::encode(args.datatype1)));
498
499     MPI_Status status;
500     // unknown size from the receiver point of view
501     if (args.size <= 0.0) {
502       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
503       args.size = status.count;
504     }
505
506     if (name == "recv") {
507       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
508     } else if (name == "Irecv") {
509       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
510       get_reqq_self()->push_back(request);
511     }
512
513     TRACE_smpi_comm_out(my_proc_id);
514     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
515     if (name == "recv" && not TRACE_smpi_view_internals()) {
516       TRACE_smpi_recv(src_traced, my_proc_id, 0);
517     }
518   }
519 };
520
521 class ComputeAction : public ReplayAction<ComputeParser> {
522 public:
523   ComputeAction() : ReplayAction("compute") {}
524   void kernel(simgrid::xbt::ReplayAction& action) override
525   {
526     TRACE_smpi_computing_in(my_proc_id, args.flops);
527     smpi_execute_flops(args.flops);
528     TRACE_smpi_computing_out(my_proc_id);
529   }
530 };
531
532 class TestAction : public ReplayAction<ActionArgParser> {
533 public:
534   TestAction() : ReplayAction("Test") {}
535   void kernel(simgrid::xbt::ReplayAction& action) override
536   {
537     MPI_Request request = get_reqq_self()->back();
538     get_reqq_self()->pop_back();
539     // if request is null here, this may mean that a previous test has succeeded
540     // Different times in traced application and replayed version may lead to this
541     // In this case, ignore the extra calls.
542     if (request != nullptr) {
543       TRACE_smpi_testing_in(my_proc_id);
544
545       MPI_Status status;
546       int flag = Request::test(&request, &status);
547
548       XBT_DEBUG("MPI_Test result: %d", flag);
549       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
550        * nullptr.*/
551       get_reqq_self()->push_back(request);
552
553       TRACE_smpi_testing_out(my_proc_id);
554     }
555   }
556 };
557
558 class InitAction : public ReplayAction<ActionArgParser> {
559 public:
560   InitAction() : ReplayAction("Init") {}
561   void kernel(simgrid::xbt::ReplayAction& action) override
562   {
563     CHECK_ACTION_PARAMS(action, 0, 1)
564     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565                                            : MPI_BYTE;  // default TAU datatype
566
567     /* start a simulated timer */
568     smpi_process()->simulated_start();
569     /*initialize the number of active processes */
570     active_processes = smpi_process_count();
571
572     set_reqq_self(new std::vector<MPI_Request>);
573   }
574 };
575
576 class CommunicatorAction : public ReplayAction<ActionArgParser> {
577 public:
578   CommunicatorAction() : ReplayAction("Comm") {}
579   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
580 };
581
582 class WaitAllAction : public ReplayAction<ActionArgParser> {
583 public:
584   WaitAllAction() : ReplayAction("waitAll") {}
585   void kernel(simgrid::xbt::ReplayAction& action) override
586   {
587     const unsigned int count_requests = get_reqq_self()->size();
588
589     if (count_requests > 0) {
590       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
591       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
592       for (const auto& req : (*get_reqq_self())) {
593         if (req && (req->flags() & RECV)) {
594           sender_receiver.push_back({req->src(), req->dst()});
595         }
596       }
597       MPI_Status status[count_requests];
598       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
599
600       for (auto& pair : sender_receiver) {
601         TRACE_smpi_recv(pair.first, pair.second, 0);
602       }
603       TRACE_smpi_comm_out(my_proc_id);
604     }
605   }
606 };
607
608 class BarrierAction : public ReplayAction<ActionArgParser> {
609 public:
610   BarrierAction() : ReplayAction("barrier") {}
611   void kernel(simgrid::xbt::ReplayAction& action) override
612   {
613     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
614     Colls::barrier(MPI_COMM_WORLD);
615     TRACE_smpi_comm_out(my_proc_id);
616   }
617 };
618
619 class BcastAction : public ReplayAction<BcastArgParser> {
620 public:
621   BcastAction() : ReplayAction("bcast") {}
622   void kernel(simgrid::xbt::ReplayAction& action) override
623   {
624     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
625                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
626                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
627
628     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
629
630     TRACE_smpi_comm_out(my_proc_id);
631   }
632 };
633
634 class ReduceAction : public ReplayAction<ReduceArgParser> {
635 public:
636   ReduceAction() : ReplayAction("reduce") {}
637   void kernel(simgrid::xbt::ReplayAction& action) override
638   {
639     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
640                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
641                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
642
643     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
644         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
645     smpi_execute_flops(args.comp_size);
646
647     TRACE_smpi_comm_out(my_proc_id);
648   }
649 };
650
651 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
652 public:
653   AllReduceAction() : ReplayAction("allReduce") {}
654   void kernel(simgrid::xbt::ReplayAction& action) override
655   {
656     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
657                                                                                 Datatype::encode(args.datatype1), ""));
658
659     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
660         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
661     smpi_execute_flops(args.comp_size);
662
663     TRACE_smpi_comm_out(my_proc_id);
664   }
665 };
666
667 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
668 public:
669   AllToAllAction() : ReplayAction("allToAll") {}
670   void kernel(simgrid::xbt::ReplayAction& action) override
671   {
672     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
673                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
674                                                     Datatype::encode(args.datatype1),
675                                                     Datatype::encode(args.datatype2)));
676
677     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
678       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
679       args.recv_size, args.datatype2, MPI_COMM_WORLD);
680
681     TRACE_smpi_comm_out(my_proc_id);
682   }
683 };
684
685 class GatherAction : public ReplayAction<GatherArgParser> {
686 public:
687   explicit GatherAction(std::string name) : ReplayAction(name) {}
688   void kernel(simgrid::xbt::ReplayAction& action) override
689   {
690     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
691                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
692
693     if (name == "gather") {
694       int rank = MPI_COMM_WORLD->rank();
695       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
696                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
697     }
698     else
699       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
701
702     TRACE_smpi_comm_out(my_proc_id);
703   }
704 };
705
706 class GatherVAction : public ReplayAction<GatherVArgParser> {
707 public:
708   explicit GatherVAction(std::string name) : ReplayAction(name) {}
709   void kernel(simgrid::xbt::ReplayAction& action) override
710   {
711     int rank = MPI_COMM_WORLD->rank();
712
713     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
714                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
715                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
716
717     if (name == "gatherV") {
718       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
719                      (rank == args.root) ? recv_buffer(args.recv_size_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
720                      MPI_COMM_WORLD);
721     }
722     else {
723       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
724                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
725                     MPI_COMM_WORLD);
726     }
727
728     TRACE_smpi_comm_out(my_proc_id);
729   }
730 };
731
732 class ScatterAction : public ReplayAction<ScatterArgParser> {
733 public:
734   ScatterAction() : ReplayAction("scatter") {}
735   void kernel(simgrid::xbt::ReplayAction& action) override
736   {
737     int rank = MPI_COMM_WORLD->rank();
738     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
739                                                                           Datatype::encode(args.datatype1),
740                                                                           Datatype::encode(args.datatype2)));
741
742     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
743                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
744
745     TRACE_smpi_comm_out(my_proc_id);
746   }
747 };
748
749
750 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
751 public:
752   ScatterVAction() : ReplayAction("scatterV") {}
753   void kernel(simgrid::xbt::ReplayAction& action) override
754   {
755     int rank = MPI_COMM_WORLD->rank();
756     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
757           nullptr, Datatype::encode(args.datatype1),
758           Datatype::encode(args.datatype2)));
759
760     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(), 
761         args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
762         MPI_COMM_WORLD);
763
764     TRACE_smpi_comm_out(my_proc_id);
765   }
766 };
767
768 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
769 public:
770   ReduceScatterAction() : ReplayAction("reduceScatter") {}
771   void kernel(simgrid::xbt::ReplayAction& action) override
772   {
773     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
774                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
775                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
776                                                          Datatype::encode(args.datatype1)));
777
778     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()), 
779                           args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
780
781     smpi_execute_flops(args.comp_size);
782     TRACE_smpi_comm_out(my_proc_id);
783   }
784 };
785
786 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
787 public:
788   AllToAllVAction() : ReplayAction("allToAllV") {}
789   void kernel(simgrid::xbt::ReplayAction& action) override
790   {
791     TRACE_smpi_comm_in(my_proc_id, __func__,
792                        new simgrid::instr::VarCollTIData(
793                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
794                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
795
796     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
797                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
798
799     TRACE_smpi_comm_out(my_proc_id);
800   }
801 };
802 } // Replay Namespace
803 }} // namespace simgrid::smpi
804
805 /** @brief Only initialize the replay, don't do it for real */
806 void smpi_replay_init(int* argc, char*** argv)
807 {
808   simgrid::smpi::Process::init(argc, argv);
809   smpi_process()->mark_as_initialized();
810   smpi_process()->set_replaying(true);
811
812   int my_proc_id = simgrid::s4u::this_actor::getPid();
813   TRACE_smpi_init(my_proc_id);
814   TRACE_smpi_computing_init(my_proc_id);
815   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
816   TRACE_smpi_comm_out(my_proc_id);
817   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
818   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
819   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822
823   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
824   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
825   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
826   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
827   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
828   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
829   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
830   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
831   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
832   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
833   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
834   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
835   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
836   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
837   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
838   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
839   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
840   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
841   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
842   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
843   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
844
845   //if we have a delayed start, sleep here.
846   if(*argc>2){
847     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
848     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
849     smpi_execute_flops(value);
850   } else {
851     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
852     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
853     smpi_execute_flops(0.0);
854   }
855 }
856
857 /** @brief actually run the replay after initialization */
858 void smpi_replay_main(int* argc, char*** argv)
859 {
860   simgrid::xbt::replay_runner(*argc, *argv);
861
862   /* and now, finalize everything */
863   /* One active process will stop. Decrease the counter*/
864   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
865   if (not get_reqq_self()->empty()) {
866     unsigned int count_requests=get_reqq_self()->size();
867     MPI_Request requests[count_requests];
868     MPI_Status status[count_requests];
869     unsigned int i=0;
870
871     for (auto const& req : *get_reqq_self()) {
872       requests[i] = req;
873       i++;
874     }
875     simgrid::smpi::Request::waitall(count_requests, requests, status);
876   }
877   delete get_reqq_self();
878   active_processes--;
879
880   if(active_processes==0){
881     /* Last process alive speaking: end the simulated timer */
882     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
883     smpi_free_replay_tmp_buffers();
884   }
885
886   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
887                      new simgrid::instr::NoOpTIData("finalize"));
888
889   smpi_process()->finalize();
890
891   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
892   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
893 }
894
895 /** @brief chain a replay initialization and a replay start */
896 void smpi_replay_run(int* argc, char*** argv)
897 {
898   smpi_replay_init(argc, argv);
899   smpi_replay_main(argc, argv);
900 }