Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
prefer this_actor:: to Actor::self()->
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%zu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
38   }
39
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 {
42   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43     std::string s = boost::algorithm::join(action, " ");
44     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
45   }
46 }
47
48 static std::vector<MPI_Request>* get_reqq_self()
49 {
50   return reqq.at(simgrid::s4u::this_actor::getPid());
51 }
52
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 {
55   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
56 }
57
58 /* Helper function */
59 static double parse_double(std::string string)
60 {
61   return xbt_str_parse_double(string.c_str(), "%s is not a double");
62 }
63
64 namespace simgrid {
65 namespace smpi {
66
67 namespace Replay {
68 class ActionArgParser {
69 public:
70   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
71 };
72
73 class SendRecvParser : public ActionArgParser {
74 public:
75   /* communication partner; if we send, this is the receiver and vice versa */
76   int partner;
77   double size;
78   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
79
80   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
81   {
82     CHECK_ACTION_PARAMS(action, 2, 1)
83     partner = std::stoi(action[2]);
84     size    = parse_double(action[3]);
85     if (action.size() > 4)
86       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
87   }
88 };
89
90 class ComputeParser : public ActionArgParser {
91 public:
92   /* communication partner; if we send, this is the receiver and vice versa */
93   double flops;
94
95   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
96   {
97     CHECK_ACTION_PARAMS(action, 1, 0)
98     flops = parse_double(action[2]);
99   }
100 };
101
102 class CollCommParser : public ActionArgParser {
103 public:
104   double size;
105   double comm_size;
106   double comp_size;
107   int send_size;
108   int recv_size;
109   int root = 0;
110   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
112 };
113
114 class BcastArgParser : public CollCommParser {
115 public:
116   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
117   {
118     CHECK_ACTION_PARAMS(action, 1, 2)
119     size = parse_double(action[2]);
120     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121     if (action.size() > 4)
122       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
123   }
124 };
125
126 class ReduceArgParser : public CollCommParser {
127 public:
128   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
129   {
130     CHECK_ACTION_PARAMS(action, 2, 2)
131     comm_size = parse_double(action[2]);
132     comp_size = parse_double(action[3]);
133     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
134     if (action.size() > 5)
135       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
136   }
137 };
138
139 class AllReduceArgParser : public CollCommParser {
140 public:
141   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
142   {
143     CHECK_ACTION_PARAMS(action, 2, 1)
144     comm_size = parse_double(action[2]);
145     comp_size = parse_double(action[3]);
146     if (action.size() > 4)
147       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
148   }
149 };
150
151 class AllToAllArgParser : public CollCommParser {
152 public:
153   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
154   {
155     CHECK_ACTION_PARAMS(action, 2, 1)
156     comm_size = MPI_COMM_WORLD->size();
157     send_size = parse_double(action[2]);
158     recv_size = parse_double(action[3]);
159
160     if (action.size() > 4)
161       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162     if (action.size() > 5)
163       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
164   }
165 };
166
167 class GatherArgParser : public CollCommParser {
168 public:
169   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
170   {
171     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
172           0 gather 68 68 0 0 0
173         where:
174           1) 68 is the sendcounts
175           2) 68 is the recvcounts
176           3) 0 is the root node
177           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
179     */
180     CHECK_ACTION_PARAMS(action, 2, 3)
181     comm_size = MPI_COMM_WORLD->size();
182     send_size = parse_double(action[2]);
183     recv_size = parse_double(action[3]);
184
185     if (name == "gather") {
186       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
187       if (action.size() > 5)
188         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189       if (action.size() > 6)
190         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
191     }
192     else {
193       if (action.size() > 4)
194         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195       if (action.size() > 5)
196         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
197     }
198   }
199 };
200
201 class GatherVArgParser : public CollCommParser {
202 public:
203   int recv_size_sum;
204   std::shared_ptr<std::vector<int>> recvcounts;
205   std::vector<int> disps;
206   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
207   {
208     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209          0 gather 68 68 10 10 10 0 0 0
210        where:
211          1) 68 is the sendcount
212          2) 68 10 10 10 is the recvcounts
213          3) 0 is the root node
214          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
216     */
217     comm_size = MPI_COMM_WORLD->size();
218     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219     send_size = parse_double(action[2]);
220     disps     = std::vector<int>(comm_size, 0);
221     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
222
223     if (name == "gatherV") {
224       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225       if (action.size() > 4 + comm_size)
226         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227       if (action.size() > 5 + comm_size)
228         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
229     }
230     else {
231       int datatype_index = 0, disp_index = 0;
232       /* The 3 comes from "0 gather <sendcount>", which must always be present.
233        * The + comm_size is the recvcounts array, which must also be present
234        */
235       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
236         datatype_index = 3 + comm_size;
237         disp_index     = datatype_index + 1;
238         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
239         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
240       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
241         disp_index     = 3 + comm_size;
242       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
243         datatype_index = 3 + comm_size;
244         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
245         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
246       }
247
248       if (disp_index != 0) {
249         for (unsigned int i = 0; i < comm_size; i++)
250           disps[i]          = std::stoi(action[disp_index + i]);
251       }
252     }
253
254     for (unsigned int i = 0; i < comm_size; i++) {
255       (*recvcounts)[i] = std::stoi(action[i + 3]);
256     }
257     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
258   }
259 };
260
261 class ScatterArgParser : public CollCommParser {
262 public:
263   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
264   {
265     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
266           0 gather 68 68 0 0 0
267         where:
268           1) 68 is the sendcounts
269           2) 68 is the recvcounts
270           3) 0 is the root node
271           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
272           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
273     */
274     CHECK_ACTION_PARAMS(action, 2, 3)
275     comm_size   = MPI_COMM_WORLD->size();
276     send_size   = parse_double(action[2]);
277     recv_size   = parse_double(action[3]);
278     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
279     if (action.size() > 5)
280       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
281     if (action.size() > 6)
282       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
283   }
284 };
285
286 class ScatterVArgParser : public CollCommParser {
287 public:
288   int recv_size_sum;
289   int send_size_sum;
290   std::shared_ptr<std::vector<int>> sendcounts;
291   std::vector<int> disps;
292   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
293   {
294     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
295        0 gather 68 10 10 10 68 0 0 0
296         where:
297         1) 68 10 10 10 is the sendcounts
298         2) 68 is the recvcount
299         3) 0 is the root node
300         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
301         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
302     */
303     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
304     recv_size  = parse_double(action[2 + comm_size]);
305     disps      = std::vector<int>(comm_size, 0);
306     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
307
308     if (action.size() > 5 + comm_size)
309       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
310     if (action.size() > 5 + comm_size)
311       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
312
313     for (unsigned int i = 0; i < comm_size; i++) {
314       (*sendcounts)[i] = std::stoi(action[i + 2]);
315     }
316     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
317     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
318   }
319 };
320
321 class ReduceScatterArgParser : public CollCommParser {
322 public:
323   int recv_size_sum;
324   std::shared_ptr<std::vector<int>> recvcounts;
325   std::vector<int> disps;
326   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
327   {
328     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
329          0 reduceScatter 275427 275427 275427 204020 11346849 0
330        where:
331          1) The first four values after the name of the action declare the recvcounts array
332          2) The value 11346849 is the amount of instructions
333          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
334     */
335     comm_size = MPI_COMM_WORLD->size();
336     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
337     comp_size = parse_double(action[2+comm_size]);
338     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
339     if (action.size() > 3 + comm_size)
340       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
341
342     for (unsigned int i = 0; i < comm_size; i++) {
343       recvcounts->push_back(std::stoi(action[i + 2]));
344     }
345     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
346   }
347 };
348
349 class AllToAllVArgParser : public CollCommParser {
350 public:
351   int recv_size_sum;
352   int send_size_sum;
353   std::shared_ptr<std::vector<int>> recvcounts;
354   std::shared_ptr<std::vector<int>> sendcounts;
355   std::vector<int> senddisps;
356   std::vector<int> recvdisps;
357   int send_buf_size;
358   int recv_buf_size;
359   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
360   {
361     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
362           0 allToAllV 100 1 7 10 12 100 1 70 10 5
363        where:
364         1) 100 is the size of the send buffer *sizeof(int),
365         2) 1 7 10 12 is the sendcounts array
366         3) 100*sizeof(int) is the size of the receiver buffer
367         4)  1 70 10 5 is the recvcounts array
368     */
369     comm_size = MPI_COMM_WORLD->size();
370     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
371     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
372     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
373     senddisps  = std::vector<int>(comm_size, 0);
374     recvdisps  = std::vector<int>(comm_size, 0);
375
376     if (action.size() > 5 + 2 * comm_size)
377       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
378     if (action.size() > 5 + 2 * comm_size)
379       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
380
381     send_buf_size=parse_double(action[2]);
382     recv_buf_size=parse_double(action[3+comm_size]);
383     for (unsigned int i = 0; i < comm_size; i++) {
384       (*sendcounts)[i] = std::stoi(action[3 + i]);
385       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
386     }
387     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
388     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
389   }
390 };
391
392 template <class T> class ReplayAction {
393 protected:
394   const std::string name;
395   T args;
396
397   int my_proc_id;
398
399 public:
400   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
401
402   virtual void execute(simgrid::xbt::ReplayAction& action)
403   {
404     // Needs to be re-initialized for every action, hence here
405     double start_time = smpi_process()->simulated_elapsed();
406     args.parse(action, name);
407     kernel(action);
408     if (name != "Init")
409       log_timed_action(action, start_time);
410   }
411
412   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
413
414   void* send_buffer(int size)
415   {
416     return smpi_get_tmp_sendbuffer(size);
417   }
418
419   void* recv_buffer(int size)
420   {
421     return smpi_get_tmp_recvbuffer(size);
422   }
423 };
424
425 class WaitAction : public ReplayAction<ActionArgParser> {
426 public:
427   WaitAction() : ReplayAction("Wait") {}
428   void kernel(simgrid::xbt::ReplayAction& action) override
429   {
430     std::string s = boost::algorithm::join(action, " ");
431     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
432     MPI_Request request = get_reqq_self()->back();
433     get_reqq_self()->pop_back();
434
435     if (request == nullptr) {
436       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
437        * return.*/
438       return;
439     }
440
441     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
442
443     // Must be taken before Request::wait() since the request may be set to
444     // MPI_REQUEST_NULL by Request::wait!
445     int src                  = request->comm()->group()->rank(request->src());
446     int dst                  = request->comm()->group()->rank(request->dst());
447     bool is_wait_for_receive = (request->flags() & RECV);
448     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
449     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
450
451     MPI_Status status;
452     Request::wait(&request, &status);
453
454     TRACE_smpi_comm_out(rank);
455     if (is_wait_for_receive)
456       TRACE_smpi_recv(src, dst, 0);
457   }
458 };
459
460 class SendAction : public ReplayAction<SendRecvParser> {
461 public:
462   SendAction() = delete;
463   SendAction(std::string name) : ReplayAction(name) {}
464   void kernel(simgrid::xbt::ReplayAction& action) override
465   {
466     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
467
468     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
469                                                                              Datatype::encode(args.datatype1)));
470     if (not TRACE_smpi_view_internals())
471       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
472
473     if (name == "send") {
474       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
475     } else if (name == "Isend") {
476       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477       get_reqq_self()->push_back(request);
478     } else {
479       xbt_die("Don't know this action, %s", name.c_str());
480     }
481
482     TRACE_smpi_comm_out(my_proc_id);
483   }
484 };
485
486 class RecvAction : public ReplayAction<SendRecvParser> {
487 public:
488   RecvAction() = delete;
489   explicit RecvAction(std::string name) : ReplayAction(name) {}
490   void kernel(simgrid::xbt::ReplayAction& action) override
491   {
492     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
493
494     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
495                                                                              Datatype::encode(args.datatype1)));
496
497     MPI_Status status;
498     // unknown size from the receiver point of view
499     if (args.size <= 0.0) {
500       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
501       args.size = status.count;
502     }
503
504     if (name == "recv") {
505       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
506     } else if (name == "Irecv") {
507       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
508       get_reqq_self()->push_back(request);
509     }
510
511     TRACE_smpi_comm_out(my_proc_id);
512     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
513     if (name == "recv" && not TRACE_smpi_view_internals()) {
514       TRACE_smpi_recv(src_traced, my_proc_id, 0);
515     }
516   }
517 };
518
519 class ComputeAction : public ReplayAction<ComputeParser> {
520 public:
521   ComputeAction() : ReplayAction("compute") {}
522   void kernel(simgrid::xbt::ReplayAction& action) override
523   {
524     TRACE_smpi_computing_in(my_proc_id, args.flops);
525     smpi_execute_flops(args.flops);
526     TRACE_smpi_computing_out(my_proc_id);
527   }
528 };
529
530 class TestAction : public ReplayAction<ActionArgParser> {
531 public:
532   TestAction() : ReplayAction("Test") {}
533   void kernel(simgrid::xbt::ReplayAction& action) override
534   {
535     MPI_Request request = get_reqq_self()->back();
536     get_reqq_self()->pop_back();
537     // if request is null here, this may mean that a previous test has succeeded
538     // Different times in traced application and replayed version may lead to this
539     // In this case, ignore the extra calls.
540     if (request != nullptr) {
541       TRACE_smpi_testing_in(my_proc_id);
542
543       MPI_Status status;
544       int flag = Request::test(&request, &status);
545
546       XBT_DEBUG("MPI_Test result: %d", flag);
547       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
548        * nullptr.*/
549       get_reqq_self()->push_back(request);
550
551       TRACE_smpi_testing_out(my_proc_id);
552     }
553   }
554 };
555
556 class InitAction : public ReplayAction<ActionArgParser> {
557 public:
558   InitAction() : ReplayAction("Init") {}
559   void kernel(simgrid::xbt::ReplayAction& action) override
560   {
561     CHECK_ACTION_PARAMS(action, 0, 1)
562     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
563                                            : MPI_BYTE;  // default TAU datatype
564
565     /* start a simulated timer */
566     smpi_process()->simulated_start();
567     /*initialize the number of active processes */
568     active_processes = smpi_process_count();
569
570     set_reqq_self(new std::vector<MPI_Request>);
571   }
572 };
573
574 class CommunicatorAction : public ReplayAction<ActionArgParser> {
575 public:
576   CommunicatorAction() : ReplayAction("Comm") {}
577   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
578 };
579
580 class WaitAllAction : public ReplayAction<ActionArgParser> {
581 public:
582   WaitAllAction() : ReplayAction("waitAll") {}
583   void kernel(simgrid::xbt::ReplayAction& action) override
584   {
585     const unsigned int count_requests = get_reqq_self()->size();
586
587     if (count_requests > 0) {
588       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
589       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
590       for (const auto& req : (*get_reqq_self())) {
591         if (req && (req->flags() & RECV)) {
592           sender_receiver.push_back({req->src(), req->dst()});
593         }
594       }
595       MPI_Status status[count_requests];
596       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
597
598       for (auto& pair : sender_receiver) {
599         TRACE_smpi_recv(pair.first, pair.second, 0);
600       }
601       TRACE_smpi_comm_out(my_proc_id);
602     }
603   }
604 };
605
606 class BarrierAction : public ReplayAction<ActionArgParser> {
607 public:
608   BarrierAction() : ReplayAction("barrier") {}
609   void kernel(simgrid::xbt::ReplayAction& action) override
610   {
611     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
612     Colls::barrier(MPI_COMM_WORLD);
613     TRACE_smpi_comm_out(my_proc_id);
614   }
615 };
616
617 class BcastAction : public ReplayAction<BcastArgParser> {
618 public:
619   BcastAction() : ReplayAction("bcast") {}
620   void kernel(simgrid::xbt::ReplayAction& action) override
621   {
622     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
623                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
624                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
625
626     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
627
628     TRACE_smpi_comm_out(my_proc_id);
629   }
630 };
631
632 class ReduceAction : public ReplayAction<ReduceArgParser> {
633 public:
634   ReduceAction() : ReplayAction("reduce") {}
635   void kernel(simgrid::xbt::ReplayAction& action) override
636   {
637     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
638                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
639                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
640
641     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
642         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
643     smpi_execute_flops(args.comp_size);
644
645     TRACE_smpi_comm_out(my_proc_id);
646   }
647 };
648
649 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
650 public:
651   AllReduceAction() : ReplayAction("allReduce") {}
652   void kernel(simgrid::xbt::ReplayAction& action) override
653   {
654     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
655                                                                                 Datatype::encode(args.datatype1), ""));
656
657     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
658         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
659     smpi_execute_flops(args.comp_size);
660
661     TRACE_smpi_comm_out(my_proc_id);
662   }
663 };
664
665 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
666 public:
667   AllToAllAction() : ReplayAction("allToAll") {}
668   void kernel(simgrid::xbt::ReplayAction& action) override
669   {
670     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
671                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
672                                                     Datatype::encode(args.datatype1),
673                                                     Datatype::encode(args.datatype2)));
674
675     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
676       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
677       args.recv_size, args.datatype2, MPI_COMM_WORLD);
678
679     TRACE_smpi_comm_out(my_proc_id);
680   }
681 };
682
683 class GatherAction : public ReplayAction<GatherArgParser> {
684 public:
685   GatherAction(std::string name) : ReplayAction(name) {}
686   void kernel(simgrid::xbt::ReplayAction& action) override
687   {
688     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
689                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
690
691     if (name == "gather") {
692       int rank = MPI_COMM_WORLD->rank();
693       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
694                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
695     }
696     else
697       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
699
700     TRACE_smpi_comm_out(my_proc_id);
701   }
702 };
703
704 class GatherVAction : public ReplayAction<GatherVArgParser> {
705 public:
706   GatherVAction(std::string name) : ReplayAction(name) {}
707   void kernel(simgrid::xbt::ReplayAction& action) override
708   {
709     int rank = MPI_COMM_WORLD->rank();
710
711     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
712                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
713                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
714
715     if (name == "gatherV") {
716       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
717                      (rank == args.root) ? recv_buffer(args.recv_size_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
718                      MPI_COMM_WORLD);
719     }
720     else {
721       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
722                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
723                     MPI_COMM_WORLD);
724     }
725
726     TRACE_smpi_comm_out(my_proc_id);
727   }
728 };
729
730 class ScatterAction : public ReplayAction<ScatterArgParser> {
731 public:
732   ScatterAction() : ReplayAction("scatter") {}
733   void kernel(simgrid::xbt::ReplayAction& action) override
734   {
735     int rank = MPI_COMM_WORLD->rank();
736     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
737                                                                           Datatype::encode(args.datatype1),
738                                                                           Datatype::encode(args.datatype2)));
739
740     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
741                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
742
743     TRACE_smpi_comm_out(my_proc_id);
744   }
745 };
746
747
748 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
749 public:
750   ScatterVAction() : ReplayAction("scatterV") {}
751   void kernel(simgrid::xbt::ReplayAction& action) override
752   {
753     int rank = MPI_COMM_WORLD->rank();
754     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
755           nullptr, Datatype::encode(args.datatype1),
756           Datatype::encode(args.datatype2)));
757
758     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(), 
759         args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
760         MPI_COMM_WORLD);
761
762     TRACE_smpi_comm_out(my_proc_id);
763   }
764 };
765
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
767 public:
768   ReduceScatterAction() : ReplayAction("reduceScatter") {}
769   void kernel(simgrid::xbt::ReplayAction& action) override
770   {
771     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
774                                                          Datatype::encode(args.datatype1)));
775
776     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()), 
777                           args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
778
779     smpi_execute_flops(args.comp_size);
780     TRACE_smpi_comm_out(my_proc_id);
781   }
782 };
783
784 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
785 public:
786   AllToAllVAction() : ReplayAction("allToAllV") {}
787   void kernel(simgrid::xbt::ReplayAction& action) override
788   {
789     TRACE_smpi_comm_in(my_proc_id, __func__,
790                        new simgrid::instr::VarCollTIData(
791                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
792                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
793
794     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
795                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
796
797     TRACE_smpi_comm_out(my_proc_id);
798   }
799 };
800 } // Replay Namespace
801 }} // namespace simgrid::smpi
802
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
805 {
806   simgrid::smpi::Process::init(argc, argv);
807   smpi_process()->mark_as_initialized();
808   smpi_process()->set_replaying(true);
809
810   int my_proc_id = simgrid::s4u::this_actor::getPid();
811   TRACE_smpi_init(my_proc_id);
812   TRACE_smpi_computing_init(my_proc_id);
813   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
814   TRACE_smpi_comm_out(my_proc_id);
815   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
816   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
817   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
818   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
819   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
820
821   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
822   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
823   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
824   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
825   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
826   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
827   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
828   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
829   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
830   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
831   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
832   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
833   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllVAction().execute(action); });
834   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
835   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
836   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
837   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
838   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
839   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
840   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
841   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
842
843   //if we have a delayed start, sleep here.
844   if(*argc>2){
845     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
846     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
847     smpi_execute_flops(value);
848   } else {
849     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
850     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
851     smpi_execute_flops(0.0);
852   }
853 }
854
855 /** @brief actually run the replay after initialization */
856 void smpi_replay_main(int* argc, char*** argv)
857 {
858   simgrid::xbt::replay_runner(*argc, *argv);
859
860   /* and now, finalize everything */
861   /* One active process will stop. Decrease the counter*/
862   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
863   if (not get_reqq_self()->empty()) {
864     unsigned int count_requests=get_reqq_self()->size();
865     MPI_Request requests[count_requests];
866     MPI_Status status[count_requests];
867     unsigned int i=0;
868
869     for (auto const& req : *get_reqq_self()) {
870       requests[i] = req;
871       i++;
872     }
873     simgrid::smpi::Request::waitall(count_requests, requests, status);
874   }
875   delete get_reqq_self();
876   active_processes--;
877
878   if(active_processes==0){
879     /* Last process alive speaking: end the simulated timer */
880     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
881     smpi_free_replay_tmp_buffers();
882   }
883
884   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
885                      new simgrid::instr::NoOpTIData("finalize"));
886
887   smpi_process()->finalize();
888
889   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
890   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
891 }
892
893 /** @brief chain a replay initialization and a replay start */
894 void smpi_replay_run(int* argc, char*** argv)
895 {
896   smpi_replay_init(argc, argv);
897   smpi_replay_main(argc, argv);
898 }