Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
start snake_casing s4u::Actor
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%zu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
38   }
39
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 {
42   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43     std::string s = boost::algorithm::join(action, " ");
44     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
45   }
46 }
47
48 static std::vector<MPI_Request>* get_reqq_self()
49 {
50   return reqq.at(simgrid::s4u::this_actor::getPid());
51 }
52
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 {
55   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
56 }
57
58 /* Helper function */
59 static double parse_double(std::string string)
60 {
61   return xbt_str_parse_double(string.c_str(), "%s is not a double");
62 }
63
64 namespace simgrid {
65 namespace smpi {
66
67 namespace replay {
68 class ActionArgParser {
69 public:
70   virtual ~ActionArgParser() = default;
71   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherArgParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185
186     if (name == "gather") {
187       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188       if (action.size() > 5)
189         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190       if (action.size() > 6)
191         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
192     }
193     else {
194       if (action.size() > 4)
195         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196       if (action.size() > 5)
197         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
198     }
199   }
200 };
201
202 class GatherVArgParser : public CollCommParser {
203 public:
204   int recv_size_sum;
205   std::shared_ptr<std::vector<int>> recvcounts;
206   std::vector<int> disps;
207   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208   {
209     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210          0 gather 68 68 10 10 10 0 0 0
211        where:
212          1) 68 is the sendcount
213          2) 68 10 10 10 is the recvcounts
214          3) 0 is the root node
215          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217     */
218     comm_size = MPI_COMM_WORLD->size();
219     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220     send_size = parse_double(action[2]);
221     disps     = std::vector<int>(comm_size, 0);
222     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223
224     if (name == "gatherV") {
225       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226       if (action.size() > 4 + comm_size)
227         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228       if (action.size() > 5 + comm_size)
229         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
230     }
231     else {
232       int datatype_index = 0;
233       int disp_index     = 0;
234       /* The 3 comes from "0 gather <sendcount>", which must always be present.
235        * The + comm_size is the recvcounts array, which must also be present
236        */
237       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
238         datatype_index = 3 + comm_size;
239         disp_index     = datatype_index + 1;
240         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
241         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
242       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
243         disp_index     = 3 + comm_size;
244       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
245         datatype_index = 3 + comm_size;
246         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
247         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
248       }
249
250       if (disp_index != 0) {
251         for (unsigned int i = 0; i < comm_size; i++)
252           disps[i]          = std::stoi(action[disp_index + i]);
253       }
254     }
255
256     for (unsigned int i = 0; i < comm_size; i++) {
257       (*recvcounts)[i] = std::stoi(action[i + 3]);
258     }
259     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
260   }
261 };
262
263 class ScatterArgParser : public CollCommParser {
264 public:
265   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
266   {
267     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
268           0 gather 68 68 0 0 0
269         where:
270           1) 68 is the sendcounts
271           2) 68 is the recvcounts
272           3) 0 is the root node
273           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275     */
276     CHECK_ACTION_PARAMS(action, 2, 3)
277     comm_size   = MPI_COMM_WORLD->size();
278     send_size   = parse_double(action[2]);
279     recv_size   = parse_double(action[3]);
280     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
281     if (action.size() > 5)
282       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
283     if (action.size() > 6)
284       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
285   }
286 };
287
288 class ScatterVArgParser : public CollCommParser {
289 public:
290   int recv_size_sum;
291   int send_size_sum;
292   std::shared_ptr<std::vector<int>> sendcounts;
293   std::vector<int> disps;
294   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
295   {
296     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
297        0 gather 68 10 10 10 68 0 0 0
298         where:
299         1) 68 10 10 10 is the sendcounts
300         2) 68 is the recvcount
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304     */
305     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
306     recv_size  = parse_double(action[2 + comm_size]);
307     disps      = std::vector<int>(comm_size, 0);
308     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
309
310     if (action.size() > 5 + comm_size)
311       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
312     if (action.size() > 5 + comm_size)
313       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
314
315     for (unsigned int i = 0; i < comm_size; i++) {
316       (*sendcounts)[i] = std::stoi(action[i + 2]);
317     }
318     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
319     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
320   }
321 };
322
323 class ReduceScatterArgParser : public CollCommParser {
324 public:
325   int recv_size_sum;
326   std::shared_ptr<std::vector<int>> recvcounts;
327   std::vector<int> disps;
328   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
329   {
330     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
331          0 reduceScatter 275427 275427 275427 204020 11346849 0
332        where:
333          1) The first four values after the name of the action declare the recvcounts array
334          2) The value 11346849 is the amount of instructions
335          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
336     */
337     comm_size = MPI_COMM_WORLD->size();
338     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
339     comp_size = parse_double(action[2+comm_size]);
340     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
341     if (action.size() > 3 + comm_size)
342       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
343
344     for (unsigned int i = 0; i < comm_size; i++) {
345       recvcounts->push_back(std::stoi(action[i + 2]));
346     }
347     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
348   }
349 };
350
351 class AllToAllVArgParser : public CollCommParser {
352 public:
353   int recv_size_sum;
354   int send_size_sum;
355   std::shared_ptr<std::vector<int>> recvcounts;
356   std::shared_ptr<std::vector<int>> sendcounts;
357   std::vector<int> senddisps;
358   std::vector<int> recvdisps;
359   int send_buf_size;
360   int recv_buf_size;
361   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
362   {
363     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
364           0 allToAllV 100 1 7 10 12 100 1 70 10 5
365        where:
366         1) 100 is the size of the send buffer *sizeof(int),
367         2) 1 7 10 12 is the sendcounts array
368         3) 100*sizeof(int) is the size of the receiver buffer
369         4)  1 70 10 5 is the recvcounts array
370     */
371     comm_size = MPI_COMM_WORLD->size();
372     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
373     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
374     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
375     senddisps  = std::vector<int>(comm_size, 0);
376     recvdisps  = std::vector<int>(comm_size, 0);
377
378     if (action.size() > 5 + 2 * comm_size)
379       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
380     if (action.size() > 5 + 2 * comm_size)
381       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
382
383     send_buf_size=parse_double(action[2]);
384     recv_buf_size=parse_double(action[3+comm_size]);
385     for (unsigned int i = 0; i < comm_size; i++) {
386       (*sendcounts)[i] = std::stoi(action[3 + i]);
387       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
388     }
389     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
390     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
391   }
392 };
393
394 template <class T> class ReplayAction {
395 protected:
396   const std::string name;
397   const int my_proc_id;
398   T args;
399
400 public:
401   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
402   virtual ~ReplayAction() = default;
403
404   virtual void execute(simgrid::xbt::ReplayAction& action)
405   {
406     // Needs to be re-initialized for every action, hence here
407     double start_time = smpi_process()->simulated_elapsed();
408     args.parse(action, name);
409     kernel(action);
410     if (name != "Init")
411       log_timed_action(action, start_time);
412   }
413
414   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
415
416   void* send_buffer(int size)
417   {
418     return smpi_get_tmp_sendbuffer(size);
419   }
420
421   void* recv_buffer(int size)
422   {
423     return smpi_get_tmp_recvbuffer(size);
424   }
425 };
426
427 class WaitAction : public ReplayAction<ActionArgParser> {
428 public:
429   WaitAction() : ReplayAction("Wait") {}
430   void kernel(simgrid::xbt::ReplayAction& action) override
431   {
432     std::string s = boost::algorithm::join(action, " ");
433     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
434     MPI_Request request = get_reqq_self()->back();
435     get_reqq_self()->pop_back();
436
437     if (request == nullptr) {
438       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
439        * return.*/
440       return;
441     }
442
443     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
444
445     // Must be taken before Request::wait() since the request may be set to
446     // MPI_REQUEST_NULL by Request::wait!
447     int src                  = request->comm()->group()->rank(request->src());
448     int dst                  = request->comm()->group()->rank(request->dst());
449     bool is_wait_for_receive = (request->flags() & RECV);
450     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
451     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
452
453     MPI_Status status;
454     Request::wait(&request, &status);
455
456     TRACE_smpi_comm_out(rank);
457     if (is_wait_for_receive)
458       TRACE_smpi_recv(src, dst, 0);
459   }
460 };
461
462 class SendAction : public ReplayAction<SendRecvParser> {
463 public:
464   SendAction() = delete;
465   explicit SendAction(std::string name) : ReplayAction(name) {}
466   void kernel(simgrid::xbt::ReplayAction& action) override
467   {
468     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
469
470     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
471                                                                              Datatype::encode(args.datatype1)));
472     if (not TRACE_smpi_view_internals())
473       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
474
475     if (name == "send") {
476       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
477     } else if (name == "Isend") {
478       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
479       get_reqq_self()->push_back(request);
480     } else {
481       xbt_die("Don't know this action, %s", name.c_str());
482     }
483
484     TRACE_smpi_comm_out(my_proc_id);
485   }
486 };
487
488 class RecvAction : public ReplayAction<SendRecvParser> {
489 public:
490   RecvAction() = delete;
491   explicit RecvAction(std::string name) : ReplayAction(name) {}
492   void kernel(simgrid::xbt::ReplayAction& action) override
493   {
494     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
495
496     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
497                                                                              Datatype::encode(args.datatype1)));
498
499     MPI_Status status;
500     // unknown size from the receiver point of view
501     if (args.size <= 0.0) {
502       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
503       args.size = status.count;
504     }
505
506     if (name == "recv") {
507       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
508     } else if (name == "Irecv") {
509       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
510       get_reqq_self()->push_back(request);
511     }
512
513     TRACE_smpi_comm_out(my_proc_id);
514     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
515     if (name == "recv" && not TRACE_smpi_view_internals()) {
516       TRACE_smpi_recv(src_traced, my_proc_id, 0);
517     }
518   }
519 };
520
521 class ComputeAction : public ReplayAction<ComputeParser> {
522 public:
523   ComputeAction() : ReplayAction("compute") {}
524   void kernel(simgrid::xbt::ReplayAction& action) override
525   {
526     TRACE_smpi_computing_in(my_proc_id, args.flops);
527     smpi_execute_flops(args.flops);
528     TRACE_smpi_computing_out(my_proc_id);
529   }
530 };
531
532 class TestAction : public ReplayAction<ActionArgParser> {
533 public:
534   TestAction() : ReplayAction("Test") {}
535   void kernel(simgrid::xbt::ReplayAction& action) override
536   {
537     MPI_Request request = get_reqq_self()->back();
538     get_reqq_self()->pop_back();
539     // if request is null here, this may mean that a previous test has succeeded
540     // Different times in traced application and replayed version may lead to this
541     // In this case, ignore the extra calls.
542     if (request != nullptr) {
543       TRACE_smpi_testing_in(my_proc_id);
544
545       MPI_Status status;
546       int flag = Request::test(&request, &status);
547
548       XBT_DEBUG("MPI_Test result: %d", flag);
549       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
550        * nullptr.*/
551       get_reqq_self()->push_back(request);
552
553       TRACE_smpi_testing_out(my_proc_id);
554     }
555   }
556 };
557
558 class InitAction : public ReplayAction<ActionArgParser> {
559 public:
560   InitAction() : ReplayAction("Init") {}
561   void kernel(simgrid::xbt::ReplayAction& action) override
562   {
563     CHECK_ACTION_PARAMS(action, 0, 1)
564     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565                                            : MPI_BYTE;  // default TAU datatype
566
567     /* start a simulated timer */
568     smpi_process()->simulated_start();
569     /*initialize the number of active processes */
570     active_processes = smpi_process_count();
571
572     set_reqq_self(new std::vector<MPI_Request>);
573   }
574 };
575
576 class CommunicatorAction : public ReplayAction<ActionArgParser> {
577 public:
578   CommunicatorAction() : ReplayAction("Comm") {}
579   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
580 };
581
582 class WaitAllAction : public ReplayAction<ActionArgParser> {
583 public:
584   WaitAllAction() : ReplayAction("waitAll") {}
585   void kernel(simgrid::xbt::ReplayAction& action) override
586   {
587     const unsigned int count_requests = get_reqq_self()->size();
588
589     if (count_requests > 0) {
590       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
591       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
592       for (const auto& req : (*get_reqq_self())) {
593         if (req && (req->flags() & RECV)) {
594           sender_receiver.push_back({req->src(), req->dst()});
595         }
596       }
597       MPI_Status status[count_requests];
598       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
599
600       for (auto& pair : sender_receiver) {
601         TRACE_smpi_recv(pair.first, pair.second, 0);
602       }
603       TRACE_smpi_comm_out(my_proc_id);
604     }
605   }
606 };
607
608 class BarrierAction : public ReplayAction<ActionArgParser> {
609 public:
610   BarrierAction() : ReplayAction("barrier") {}
611   void kernel(simgrid::xbt::ReplayAction& action) override
612   {
613     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
614     Colls::barrier(MPI_COMM_WORLD);
615     TRACE_smpi_comm_out(my_proc_id);
616   }
617 };
618
619 class BcastAction : public ReplayAction<BcastArgParser> {
620 public:
621   BcastAction() : ReplayAction("bcast") {}
622   void kernel(simgrid::xbt::ReplayAction& action) override
623   {
624     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
625                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
626                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
627
628     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
629
630     TRACE_smpi_comm_out(my_proc_id);
631   }
632 };
633
634 class ReduceAction : public ReplayAction<ReduceArgParser> {
635 public:
636   ReduceAction() : ReplayAction("reduce") {}
637   void kernel(simgrid::xbt::ReplayAction& action) override
638   {
639     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
640                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
641                                                       args.comp_size, args.comm_size, -1,
642                                                       Datatype::encode(args.datatype1), ""));
643
644     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
645         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
646     smpi_execute_flops(args.comp_size);
647
648     TRACE_smpi_comm_out(my_proc_id);
649   }
650 };
651
652 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
653 public:
654   AllReduceAction() : ReplayAction("allReduce") {}
655   void kernel(simgrid::xbt::ReplayAction& action) override
656   {
657     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
658                                                                                 Datatype::encode(args.datatype1), ""));
659
660     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
661         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
662     smpi_execute_flops(args.comp_size);
663
664     TRACE_smpi_comm_out(my_proc_id);
665   }
666 };
667
668 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
669 public:
670   AllToAllAction() : ReplayAction("allToAll") {}
671   void kernel(simgrid::xbt::ReplayAction& action) override
672   {
673     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
674                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
675                                                     Datatype::encode(args.datatype1),
676                                                     Datatype::encode(args.datatype2)));
677
678     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
679                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
680                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
681
682     TRACE_smpi_comm_out(my_proc_id);
683   }
684 };
685
686 class GatherAction : public ReplayAction<GatherArgParser> {
687 public:
688   explicit GatherAction(std::string name) : ReplayAction(name) {}
689   void kernel(simgrid::xbt::ReplayAction& action) override
690   {
691     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
692                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
693
694     if (name == "gather") {
695       int rank = MPI_COMM_WORLD->rank();
696       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
698     }
699     else
700       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
702
703     TRACE_smpi_comm_out(my_proc_id);
704   }
705 };
706
707 class GatherVAction : public ReplayAction<GatherVArgParser> {
708 public:
709   explicit GatherVAction(std::string name) : ReplayAction(name) {}
710   void kernel(simgrid::xbt::ReplayAction& action) override
711   {
712     int rank = MPI_COMM_WORLD->rank();
713
714     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
715                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
716                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
717
718     if (name == "gatherV") {
719       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
720                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
721                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
722     }
723     else {
724       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
725                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
726                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
727     }
728
729     TRACE_smpi_comm_out(my_proc_id);
730   }
731 };
732
733 class ScatterAction : public ReplayAction<ScatterArgParser> {
734 public:
735   ScatterAction() : ReplayAction("scatter") {}
736   void kernel(simgrid::xbt::ReplayAction& action) override
737   {
738     int rank = MPI_COMM_WORLD->rank();
739     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
740                                                                           Datatype::encode(args.datatype1),
741                                                                           Datatype::encode(args.datatype2)));
742
743     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
744                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
745
746     TRACE_smpi_comm_out(my_proc_id);
747   }
748 };
749
750
751 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
752 public:
753   ScatterVAction() : ReplayAction("scatterV") {}
754   void kernel(simgrid::xbt::ReplayAction& action) override
755   {
756     int rank = MPI_COMM_WORLD->rank();
757     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
758           nullptr, Datatype::encode(args.datatype1),
759           Datatype::encode(args.datatype2)));
760
761     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
762                     args.sendcounts->data(), args.disps.data(), args.datatype1,
763                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
764                     MPI_COMM_WORLD);
765
766     TRACE_smpi_comm_out(my_proc_id);
767   }
768 };
769
770 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
771 public:
772   ReduceScatterAction() : ReplayAction("reduceScatter") {}
773   void kernel(simgrid::xbt::ReplayAction& action) override
774   {
775     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
776                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
777                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
778                                                          Datatype::encode(args.datatype1)));
779
780     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
781                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
782                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
783
784     smpi_execute_flops(args.comp_size);
785     TRACE_smpi_comm_out(my_proc_id);
786   }
787 };
788
789 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
790 public:
791   AllToAllVAction() : ReplayAction("allToAllV") {}
792   void kernel(simgrid::xbt::ReplayAction& action) override
793   {
794     TRACE_smpi_comm_in(my_proc_id, __func__,
795                        new simgrid::instr::VarCollTIData(
796                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
797                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
798
799     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
800                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
801
802     TRACE_smpi_comm_out(my_proc_id);
803   }
804 };
805 } // Replay Namespace
806 }} // namespace simgrid::smpi
807
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(int* argc, char*** argv)
810 {
811   simgrid::smpi::Process::init(argc, argv);
812   smpi_process()->mark_as_initialized();
813   smpi_process()->set_replaying(true);
814
815   int my_proc_id = simgrid::s4u::this_actor::getPid();
816   TRACE_smpi_init(my_proc_id);
817   TRACE_smpi_computing_init(my_proc_id);
818   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
819   TRACE_smpi_comm_out(my_proc_id);
820   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
821   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
822   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825
826   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
827   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
828   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
829   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
830   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
831   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
832   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
833   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
834   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
835   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
836   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
837   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
838   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
839   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
840   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
841   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
842   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
843   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
844   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
845   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
846   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
847
848   //if we have a delayed start, sleep here.
849   if(*argc>2){
850     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
851     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
852     smpi_execute_flops(value);
853   } else {
854     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
855     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
856     smpi_execute_flops(0.0);
857   }
858 }
859
860 /** @brief actually run the replay after initialization */
861 void smpi_replay_main(int* argc, char*** argv)
862 {
863   simgrid::xbt::replay_runner(*argc, *argv);
864
865   /* and now, finalize everything */
866   /* One active process will stop. Decrease the counter*/
867   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
868   if (not get_reqq_self()->empty()) {
869     unsigned int count_requests=get_reqq_self()->size();
870     MPI_Request requests[count_requests];
871     MPI_Status status[count_requests];
872     unsigned int i=0;
873
874     for (auto const& req : *get_reqq_self()) {
875       requests[i] = req;
876       i++;
877     }
878     simgrid::smpi::Request::waitall(count_requests, requests, status);
879   }
880   delete get_reqq_self();
881   active_processes--;
882
883   if(active_processes==0){
884     /* Last process alive speaking: end the simulated timer */
885     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
886     smpi_free_replay_tmp_buffers();
887   }
888
889   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
890                      new simgrid::instr::NoOpTIData("finalize"));
891
892   smpi_process()->finalize();
893
894   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
895   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
896 }
897
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
900 {
901   smpi_replay_init(argc, argv);
902   smpi_replay_main(argc, argv);
903 }