Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright headers.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26
27 static MPI_Datatype MPI_DEFAULT_TYPE;
28
29 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
30   {                                                                                                                    \
31     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
32       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
33                            "%zu items were given on the line. First two should be process_id and action.  "            \
34                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
35                            "Please contact the Simgrid team if support is needed",                                     \
36              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
37   }
38
39 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
40 {
41   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
42     std::string s = boost::algorithm::join(action, " ");
43     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
44   }
45 }
46
47 static std::vector<MPI_Request>* get_reqq_self()
48 {
49   return reqq.at(simgrid::s4u::this_actor::getPid());
50 }
51
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
53 {
54   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
55 }
56
57 /* Helper function */
58 static double parse_double(std::string string)
59 {
60   return xbt_str_parse_double(string.c_str(), "%s is not a double");
61 }
62
63 namespace simgrid {
64 namespace smpi {
65
66 namespace replay {
67 class ActionArgParser {
68 public:
69   virtual ~ActionArgParser() = default;
70   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
71 };
72
73 class SendRecvParser : public ActionArgParser {
74 public:
75   /* communication partner; if we send, this is the receiver and vice versa */
76   int partner;
77   double size;
78   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
79
80   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
81   {
82     CHECK_ACTION_PARAMS(action, 2, 1)
83     partner = std::stoi(action[2]);
84     size    = parse_double(action[3]);
85     if (action.size() > 4)
86       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
87   }
88 };
89
90 class ComputeParser : public ActionArgParser {
91 public:
92   /* communication partner; if we send, this is the receiver and vice versa */
93   double flops;
94
95   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
96   {
97     CHECK_ACTION_PARAMS(action, 1, 0)
98     flops = parse_double(action[2]);
99   }
100 };
101
102 class CollCommParser : public ActionArgParser {
103 public:
104   double size;
105   double comm_size;
106   double comp_size;
107   int send_size;
108   int recv_size;
109   int root = 0;
110   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
112 };
113
114 class BcastArgParser : public CollCommParser {
115 public:
116   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
117   {
118     CHECK_ACTION_PARAMS(action, 1, 2)
119     size = parse_double(action[2]);
120     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121     if (action.size() > 4)
122       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
123   }
124 };
125
126 class ReduceArgParser : public CollCommParser {
127 public:
128   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
129   {
130     CHECK_ACTION_PARAMS(action, 2, 2)
131     comm_size = parse_double(action[2]);
132     comp_size = parse_double(action[3]);
133     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
134     if (action.size() > 5)
135       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
136   }
137 };
138
139 class AllReduceArgParser : public CollCommParser {
140 public:
141   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
142   {
143     CHECK_ACTION_PARAMS(action, 2, 1)
144     comm_size = parse_double(action[2]);
145     comp_size = parse_double(action[3]);
146     if (action.size() > 4)
147       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
148   }
149 };
150
151 class AllToAllArgParser : public CollCommParser {
152 public:
153   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
154   {
155     CHECK_ACTION_PARAMS(action, 2, 1)
156     comm_size = MPI_COMM_WORLD->size();
157     send_size = parse_double(action[2]);
158     recv_size = parse_double(action[3]);
159
160     if (action.size() > 4)
161       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162     if (action.size() > 5)
163       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
164   }
165 };
166
167 class GatherArgParser : public CollCommParser {
168 public:
169   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
170   {
171     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
172           0 gather 68 68 0 0 0
173         where:
174           1) 68 is the sendcounts
175           2) 68 is the recvcounts
176           3) 0 is the root node
177           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
179     */
180     CHECK_ACTION_PARAMS(action, 2, 3)
181     comm_size = MPI_COMM_WORLD->size();
182     send_size = parse_double(action[2]);
183     recv_size = parse_double(action[3]);
184
185     if (name == "gather") {
186       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
187       if (action.size() > 5)
188         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189       if (action.size() > 6)
190         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
191     }
192     else {
193       if (action.size() > 4)
194         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195       if (action.size() > 5)
196         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
197     }
198   }
199 };
200
201 class GatherVArgParser : public CollCommParser {
202 public:
203   int recv_size_sum;
204   std::shared_ptr<std::vector<int>> recvcounts;
205   std::vector<int> disps;
206   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
207   {
208     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209          0 gather 68 68 10 10 10 0 0 0
210        where:
211          1) 68 is the sendcount
212          2) 68 10 10 10 is the recvcounts
213          3) 0 is the root node
214          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
216     */
217     comm_size = MPI_COMM_WORLD->size();
218     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219     send_size = parse_double(action[2]);
220     disps     = std::vector<int>(comm_size, 0);
221     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
222
223     if (name == "gatherV") {
224       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225       if (action.size() > 4 + comm_size)
226         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227       if (action.size() > 5 + comm_size)
228         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
229     }
230     else {
231       int datatype_index = 0;
232       int disp_index     = 0;
233       /* The 3 comes from "0 gather <sendcount>", which must always be present.
234        * The + comm_size is the recvcounts array, which must also be present
235        */
236       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
237         datatype_index = 3 + comm_size;
238         disp_index     = datatype_index + 1;
239         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
240         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
241       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
242         disp_index     = 3 + comm_size;
243       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
244         datatype_index = 3 + comm_size;
245         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
246         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
247       }
248
249       if (disp_index != 0) {
250         for (unsigned int i = 0; i < comm_size; i++)
251           disps[i]          = std::stoi(action[disp_index + i]);
252       }
253     }
254
255     for (unsigned int i = 0; i < comm_size; i++) {
256       (*recvcounts)[i] = std::stoi(action[i + 3]);
257     }
258     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
259   }
260 };
261
262 class ScatterArgParser : public CollCommParser {
263 public:
264   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
265   {
266     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
267           0 gather 68 68 0 0 0
268         where:
269           1) 68 is the sendcounts
270           2) 68 is the recvcounts
271           3) 0 is the root node
272           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
273           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
274     */
275     CHECK_ACTION_PARAMS(action, 2, 3)
276     comm_size   = MPI_COMM_WORLD->size();
277     send_size   = parse_double(action[2]);
278     recv_size   = parse_double(action[3]);
279     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
280     if (action.size() > 5)
281       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
282     if (action.size() > 6)
283       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
284   }
285 };
286
287 class ScatterVArgParser : public CollCommParser {
288 public:
289   int recv_size_sum;
290   int send_size_sum;
291   std::shared_ptr<std::vector<int>> sendcounts;
292   std::vector<int> disps;
293   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
294   {
295     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
296        0 gather 68 10 10 10 68 0 0 0
297         where:
298         1) 68 10 10 10 is the sendcounts
299         2) 68 is the recvcount
300         3) 0 is the root node
301         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
303     */
304     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
305     recv_size  = parse_double(action[2 + comm_size]);
306     disps      = std::vector<int>(comm_size, 0);
307     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
308
309     if (action.size() > 5 + comm_size)
310       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
311     if (action.size() > 5 + comm_size)
312       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
313
314     for (unsigned int i = 0; i < comm_size; i++) {
315       (*sendcounts)[i] = std::stoi(action[i + 2]);
316     }
317     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
318     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
319   }
320 };
321
322 class ReduceScatterArgParser : public CollCommParser {
323 public:
324   int recv_size_sum;
325   std::shared_ptr<std::vector<int>> recvcounts;
326   std::vector<int> disps;
327   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
328   {
329     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
330          0 reduceScatter 275427 275427 275427 204020 11346849 0
331        where:
332          1) The first four values after the name of the action declare the recvcounts array
333          2) The value 11346849 is the amount of instructions
334          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
335     */
336     comm_size = MPI_COMM_WORLD->size();
337     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
338     comp_size = parse_double(action[2+comm_size]);
339     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
340     if (action.size() > 3 + comm_size)
341       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
342
343     for (unsigned int i = 0; i < comm_size; i++) {
344       recvcounts->push_back(std::stoi(action[i + 2]));
345     }
346     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
347   }
348 };
349
350 class AllToAllVArgParser : public CollCommParser {
351 public:
352   int recv_size_sum;
353   int send_size_sum;
354   std::shared_ptr<std::vector<int>> recvcounts;
355   std::shared_ptr<std::vector<int>> sendcounts;
356   std::vector<int> senddisps;
357   std::vector<int> recvdisps;
358   int send_buf_size;
359   int recv_buf_size;
360   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
361   {
362     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
363           0 allToAllV 100 1 7 10 12 100 1 70 10 5
364        where:
365         1) 100 is the size of the send buffer *sizeof(int),
366         2) 1 7 10 12 is the sendcounts array
367         3) 100*sizeof(int) is the size of the receiver buffer
368         4)  1 70 10 5 is the recvcounts array
369     */
370     comm_size = MPI_COMM_WORLD->size();
371     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
372     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
373     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374     senddisps  = std::vector<int>(comm_size, 0);
375     recvdisps  = std::vector<int>(comm_size, 0);
376
377     if (action.size() > 5 + 2 * comm_size)
378       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
379     if (action.size() > 5 + 2 * comm_size)
380       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
381
382     send_buf_size=parse_double(action[2]);
383     recv_buf_size=parse_double(action[3+comm_size]);
384     for (unsigned int i = 0; i < comm_size; i++) {
385       (*sendcounts)[i] = std::stoi(action[3 + i]);
386       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
387     }
388     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
389     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
390   }
391 };
392
393 template <class T> class ReplayAction {
394 protected:
395   const std::string name;
396   const int my_proc_id;
397   T args;
398
399 public:
400   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
401   virtual ~ReplayAction() = default;
402
403   virtual void execute(simgrid::xbt::ReplayAction& action)
404   {
405     // Needs to be re-initialized for every action, hence here
406     double start_time = smpi_process()->simulated_elapsed();
407     args.parse(action, name);
408     kernel(action);
409     if (name != "Init")
410       log_timed_action(action, start_time);
411   }
412
413   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
414
415   void* send_buffer(int size)
416   {
417     return smpi_get_tmp_sendbuffer(size);
418   }
419
420   void* recv_buffer(int size)
421   {
422     return smpi_get_tmp_recvbuffer(size);
423   }
424 };
425
426 class WaitAction : public ReplayAction<ActionArgParser> {
427 public:
428   WaitAction() : ReplayAction("Wait") {}
429   void kernel(simgrid::xbt::ReplayAction& action) override
430   {
431     std::string s = boost::algorithm::join(action, " ");
432     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433     MPI_Request request = get_reqq_self()->back();
434     get_reqq_self()->pop_back();
435
436     if (request == nullptr) {
437       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
438        * return.*/
439       return;
440     }
441
442     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
443
444     // Must be taken before Request::wait() since the request may be set to
445     // MPI_REQUEST_NULL by Request::wait!
446     int src                  = request->comm()->group()->rank(request->src());
447     int dst                  = request->comm()->group()->rank(request->dst());
448     bool is_wait_for_receive = (request->flags() & RECV);
449     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
450     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
451
452     MPI_Status status;
453     Request::wait(&request, &status);
454
455     TRACE_smpi_comm_out(rank);
456     if (is_wait_for_receive)
457       TRACE_smpi_recv(src, dst, 0);
458   }
459 };
460
461 class SendAction : public ReplayAction<SendRecvParser> {
462 public:
463   SendAction() = delete;
464   explicit SendAction(std::string name) : ReplayAction(name) {}
465   void kernel(simgrid::xbt::ReplayAction& action) override
466   {
467     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
468
469     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
470                                                                              Datatype::encode(args.datatype1)));
471     if (not TRACE_smpi_view_internals())
472       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
473
474     if (name == "send") {
475       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
476     } else if (name == "Isend") {
477       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
478       get_reqq_self()->push_back(request);
479     } else {
480       xbt_die("Don't know this action, %s", name.c_str());
481     }
482
483     TRACE_smpi_comm_out(my_proc_id);
484   }
485 };
486
487 class RecvAction : public ReplayAction<SendRecvParser> {
488 public:
489   RecvAction() = delete;
490   explicit RecvAction(std::string name) : ReplayAction(name) {}
491   void kernel(simgrid::xbt::ReplayAction& action) override
492   {
493     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
494
495     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
496                                                                              Datatype::encode(args.datatype1)));
497
498     MPI_Status status;
499     // unknown size from the receiver point of view
500     if (args.size <= 0.0) {
501       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
502       args.size = status.count;
503     }
504
505     if (name == "recv") {
506       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
507     } else if (name == "Irecv") {
508       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
509       get_reqq_self()->push_back(request);
510     }
511
512     TRACE_smpi_comm_out(my_proc_id);
513     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
514     if (name == "recv" && not TRACE_smpi_view_internals()) {
515       TRACE_smpi_recv(src_traced, my_proc_id, 0);
516     }
517   }
518 };
519
520 class ComputeAction : public ReplayAction<ComputeParser> {
521 public:
522   ComputeAction() : ReplayAction("compute") {}
523   void kernel(simgrid::xbt::ReplayAction& action) override
524   {
525     TRACE_smpi_computing_in(my_proc_id, args.flops);
526     smpi_execute_flops(args.flops);
527     TRACE_smpi_computing_out(my_proc_id);
528   }
529 };
530
531 class TestAction : public ReplayAction<ActionArgParser> {
532 public:
533   TestAction() : ReplayAction("Test") {}
534   void kernel(simgrid::xbt::ReplayAction& action) override
535   {
536     MPI_Request request = get_reqq_self()->back();
537     get_reqq_self()->pop_back();
538     // if request is null here, this may mean that a previous test has succeeded
539     // Different times in traced application and replayed version may lead to this
540     // In this case, ignore the extra calls.
541     if (request != nullptr) {
542       TRACE_smpi_testing_in(my_proc_id);
543
544       MPI_Status status;
545       int flag = Request::test(&request, &status);
546
547       XBT_DEBUG("MPI_Test result: %d", flag);
548       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
549        * nullptr.*/
550       get_reqq_self()->push_back(request);
551
552       TRACE_smpi_testing_out(my_proc_id);
553     }
554   }
555 };
556
557 class InitAction : public ReplayAction<ActionArgParser> {
558 public:
559   InitAction() : ReplayAction("Init") {}
560   void kernel(simgrid::xbt::ReplayAction& action) override
561   {
562     CHECK_ACTION_PARAMS(action, 0, 1)
563     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
564                                            : MPI_BYTE;  // default TAU datatype
565
566     /* start a simulated timer */
567     smpi_process()->simulated_start();
568     set_reqq_self(new std::vector<MPI_Request>);
569   }
570 };
571
572 class CommunicatorAction : public ReplayAction<ActionArgParser> {
573 public:
574   CommunicatorAction() : ReplayAction("Comm") {}
575   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
576 };
577
578 class WaitAllAction : public ReplayAction<ActionArgParser> {
579 public:
580   WaitAllAction() : ReplayAction("waitAll") {}
581   void kernel(simgrid::xbt::ReplayAction& action) override
582   {
583     const unsigned int count_requests = get_reqq_self()->size();
584
585     if (count_requests > 0) {
586       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
587       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
588       for (const auto& req : (*get_reqq_self())) {
589         if (req && (req->flags() & RECV)) {
590           sender_receiver.push_back({req->src(), req->dst()});
591         }
592       }
593       MPI_Status status[count_requests];
594       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
595
596       for (auto& pair : sender_receiver) {
597         TRACE_smpi_recv(pair.first, pair.second, 0);
598       }
599       TRACE_smpi_comm_out(my_proc_id);
600     }
601   }
602 };
603
604 class BarrierAction : public ReplayAction<ActionArgParser> {
605 public:
606   BarrierAction() : ReplayAction("barrier") {}
607   void kernel(simgrid::xbt::ReplayAction& action) override
608   {
609     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
610     Colls::barrier(MPI_COMM_WORLD);
611     TRACE_smpi_comm_out(my_proc_id);
612   }
613 };
614
615 class BcastAction : public ReplayAction<BcastArgParser> {
616 public:
617   BcastAction() : ReplayAction("bcast") {}
618   void kernel(simgrid::xbt::ReplayAction& action) override
619   {
620     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
621                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
622                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
623
624     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
625
626     TRACE_smpi_comm_out(my_proc_id);
627   }
628 };
629
630 class ReduceAction : public ReplayAction<ReduceArgParser> {
631 public:
632   ReduceAction() : ReplayAction("reduce") {}
633   void kernel(simgrid::xbt::ReplayAction& action) override
634   {
635     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
636                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
637                                                       args.comp_size, args.comm_size, -1,
638                                                       Datatype::encode(args.datatype1), ""));
639
640     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
641         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
642     smpi_execute_flops(args.comp_size);
643
644     TRACE_smpi_comm_out(my_proc_id);
645   }
646 };
647
648 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
649 public:
650   AllReduceAction() : ReplayAction("allReduce") {}
651   void kernel(simgrid::xbt::ReplayAction& action) override
652   {
653     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
654                                                                                 Datatype::encode(args.datatype1), ""));
655
656     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
657         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
658     smpi_execute_flops(args.comp_size);
659
660     TRACE_smpi_comm_out(my_proc_id);
661   }
662 };
663
664 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
665 public:
666   AllToAllAction() : ReplayAction("allToAll") {}
667   void kernel(simgrid::xbt::ReplayAction& action) override
668   {
669     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
670                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
671                                                     Datatype::encode(args.datatype1),
672                                                     Datatype::encode(args.datatype2)));
673
674     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
675                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
676                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
677
678     TRACE_smpi_comm_out(my_proc_id);
679   }
680 };
681
682 class GatherAction : public ReplayAction<GatherArgParser> {
683 public:
684   explicit GatherAction(std::string name) : ReplayAction(name) {}
685   void kernel(simgrid::xbt::ReplayAction& action) override
686   {
687     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
688                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
689
690     if (name == "gather") {
691       int rank = MPI_COMM_WORLD->rank();
692       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
693                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
694     }
695     else
696       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
698
699     TRACE_smpi_comm_out(my_proc_id);
700   }
701 };
702
703 class GatherVAction : public ReplayAction<GatherVArgParser> {
704 public:
705   explicit GatherVAction(std::string name) : ReplayAction(name) {}
706   void kernel(simgrid::xbt::ReplayAction& action) override
707   {
708     int rank = MPI_COMM_WORLD->rank();
709
710     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
711                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
712                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
713
714     if (name == "gatherV") {
715       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
716                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
717                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
718     }
719     else {
720       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
722                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
723     }
724
725     TRACE_smpi_comm_out(my_proc_id);
726   }
727 };
728
729 class ScatterAction : public ReplayAction<ScatterArgParser> {
730 public:
731   ScatterAction() : ReplayAction("scatter") {}
732   void kernel(simgrid::xbt::ReplayAction& action) override
733   {
734     int rank = MPI_COMM_WORLD->rank();
735     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
736                                                                           Datatype::encode(args.datatype1),
737                                                                           Datatype::encode(args.datatype2)));
738
739     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
740                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
741
742     TRACE_smpi_comm_out(my_proc_id);
743   }
744 };
745
746
747 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
748 public:
749   ScatterVAction() : ReplayAction("scatterV") {}
750   void kernel(simgrid::xbt::ReplayAction& action) override
751   {
752     int rank = MPI_COMM_WORLD->rank();
753     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
754           nullptr, Datatype::encode(args.datatype1),
755           Datatype::encode(args.datatype2)));
756
757     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
758                     args.sendcounts->data(), args.disps.data(), args.datatype1,
759                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
760                     MPI_COMM_WORLD);
761
762     TRACE_smpi_comm_out(my_proc_id);
763   }
764 };
765
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
767 public:
768   ReduceScatterAction() : ReplayAction("reduceScatter") {}
769   void kernel(simgrid::xbt::ReplayAction& action) override
770   {
771     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
774                                                          Datatype::encode(args.datatype1)));
775
776     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
777                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
778                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
779
780     smpi_execute_flops(args.comp_size);
781     TRACE_smpi_comm_out(my_proc_id);
782   }
783 };
784
785 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
786 public:
787   AllToAllVAction() : ReplayAction("allToAllV") {}
788   void kernel(simgrid::xbt::ReplayAction& action) override
789   {
790     TRACE_smpi_comm_in(my_proc_id, __func__,
791                        new simgrid::instr::VarCollTIData(
792                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
794
795     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
796                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
797
798     TRACE_smpi_comm_out(my_proc_id);
799   }
800 };
801 } // Replay Namespace
802 }} // namespace simgrid::smpi
803
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(int* argc, char*** argv)
806 {
807   simgrid::smpi::Process::init(argc, argv);
808   smpi_process()->mark_as_initialized();
809   smpi_process()->set_replaying(true);
810
811   int my_proc_id = simgrid::s4u::this_actor::getPid();
812   TRACE_smpi_init(my_proc_id);
813   TRACE_smpi_computing_init(my_proc_id);
814   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
815   TRACE_smpi_comm_out(my_proc_id);
816   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
817   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
818   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821
822   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
823   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
824   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
825   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
826   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
827   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
828   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
829   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
838   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
840   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
841   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
843
844   //if we have a delayed start, sleep here.
845   if(*argc>2){
846     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
847     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
848     smpi_execute_flops(value);
849   } else {
850     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
851     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
852     smpi_execute_flops(0.0);
853   }
854 }
855
856 /** @brief actually run the replay after initialization */
857 void smpi_replay_main(int* argc, char*** argv)
858 {
859   static int active_processes = 0;
860   active_processes++;
861   simgrid::xbt::replay_runner(*argc, *argv);
862
863   /* and now, finalize everything */
864   /* One active process will stop. Decrease the counter*/
865   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
866   if (not get_reqq_self()->empty()) {
867     unsigned int count_requests=get_reqq_self()->size();
868     MPI_Request requests[count_requests];
869     MPI_Status status[count_requests];
870     unsigned int i=0;
871
872     for (auto const& req : *get_reqq_self()) {
873       requests[i] = req;
874       i++;
875     }
876     simgrid::smpi::Request::waitall(count_requests, requests, status);
877   }
878   delete get_reqq_self();
879   active_processes--;
880
881   if(active_processes==0){
882     /* Last process alive speaking: end the simulated timer */
883     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
884     smpi_free_replay_tmp_buffers();
885   }
886
887   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
888                      new simgrid::instr::NoOpTIData("finalize"));
889
890   smpi_process()->finalize();
891
892   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
893   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
894 }
895
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
898 {
899   smpi_replay_init(argc, argv);
900   smpi_replay_main(argc, argv);
901 }