Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use standard __func__ instead of __FUNCTION__.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%zu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional));    \
38   }
39
40 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
41 {
42   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
43     std::string s = boost::algorithm::join(action, " ");
44     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
45   }
46 }
47
48 static std::vector<MPI_Request>* get_reqq_self()
49 {
50   return reqq.at(Actor::self()->getPid());
51 }
52
53 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 {
55    reqq.insert({Actor::self()->getPid(), mpi_request});
56 }
57
58 /* Helper function */
59 static double parse_double(std::string string)
60 {
61   return xbt_str_parse_double(string.c_str(), "%s is not a double");
62 }
63
64 namespace simgrid {
65 namespace smpi {
66
67 namespace Replay {
68 class ActionArgParser {
69 public:
70   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
71 };
72
73 class SendRecvParser : public ActionArgParser {
74 public:
75   /* communication partner; if we send, this is the receiver and vice versa */
76   int partner;
77   double size;
78   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
79
80   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
81   {
82     CHECK_ACTION_PARAMS(action, 2, 1)
83     partner = std::stoi(action[2]);
84     size    = parse_double(action[3]);
85     if (action.size() > 4)
86       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
87   }
88 };
89
90 class ComputeParser : public ActionArgParser {
91 public:
92   /* communication partner; if we send, this is the receiver and vice versa */
93   double flops;
94
95   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
96   {
97     CHECK_ACTION_PARAMS(action, 1, 0)
98     flops = parse_double(action[2]);
99   }
100 };
101
102 class CollCommParser : public ActionArgParser {
103 public:
104   double size;
105   double comm_size;
106   double comp_size;
107   int send_size;
108   int recv_size;
109   int root = 0;
110   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
111   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
112 };
113
114 class BcastArgParser : public CollCommParser {
115 public:
116   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
117   {
118     CHECK_ACTION_PARAMS(action, 1, 2)
119     size = parse_double(action[2]);
120     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
121     if (action.size() > 4)
122       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
123   }
124 };
125
126 class ReduceArgParser : public CollCommParser {
127 public:
128   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
129   {
130     CHECK_ACTION_PARAMS(action, 2, 2)
131     comm_size = parse_double(action[2]);
132     comp_size = parse_double(action[3]);
133     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
134     if (action.size() > 5)
135       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
136   }
137 };
138
139 class AllReduceArgParser : public CollCommParser {
140 public:
141   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
142   {
143     CHECK_ACTION_PARAMS(action, 2, 1)
144     comm_size = parse_double(action[2]);
145     comp_size = parse_double(action[3]);
146     if (action.size() > 4)
147       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
148   }
149 };
150
151 class AllToAllArgParser : public CollCommParser {
152 public:
153   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
154   {
155     CHECK_ACTION_PARAMS(action, 2, 1)
156     comm_size = MPI_COMM_WORLD->size();
157     send_size = parse_double(action[2]);
158     recv_size = parse_double(action[3]);
159
160     if (action.size() > 4)
161       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
162     if (action.size() > 5)
163       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
164   }
165 };
166
167 class GatherArgParser : public CollCommParser {
168 public:
169   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
170   {
171     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
172           0 gather 68 68 0 0 0
173         where:
174           1) 68 is the sendcounts
175           2) 68 is the recvcounts
176           3) 0 is the root node
177           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
178           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
179     */
180     CHECK_ACTION_PARAMS(action, 2, 3)
181     comm_size = MPI_COMM_WORLD->size();
182     send_size = parse_double(action[2]);
183     recv_size = parse_double(action[3]);
184
185     if (name == "gather") {
186       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
187       if (action.size() > 5)
188         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189       if (action.size() > 6)
190         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
191     }
192     else {
193       if (action.size() > 4)
194         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
195       if (action.size() > 5)
196         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
197     }
198   }
199 };
200
201 class GatherVArgParser : public CollCommParser {
202 public:
203   int recv_size_sum;
204   std::shared_ptr<std::vector<int>> recvcounts;
205   std::vector<int> disps;
206   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
207   {
208     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
209          0 gather 68 68 10 10 10 0 0 0
210        where:
211          1) 68 is the sendcount
212          2) 68 10 10 10 is the recvcounts
213          3) 0 is the root node
214          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
216     */
217     comm_size = MPI_COMM_WORLD->size();
218     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
219     send_size = parse_double(action[2]);
220     disps     = std::vector<int>(comm_size, 0);
221     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
222
223     if (name == "gatherV") {
224       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
225       if (action.size() > 4 + comm_size)
226         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
227       if (action.size() > 5 + comm_size)
228         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
229     }
230     else {
231       int datatype_index = 0, disp_index = 0;
232       if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
233         datatype_index = 3 + comm_size;
234         disp_index     = datatype_index + 1;
235       } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
236         datatype_index = -1;
237         disp_index     = 3 + comm_size;
238       } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
239         datatype_index = 3 + comm_size;
240       }
241
242       if (disp_index != 0) {
243         for (unsigned int i = 0; i < comm_size; i++)
244           disps[i]          = std::stoi(action[disp_index + i]);
245       }
246
247       datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
248       datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249     }
250
251     for (unsigned int i = 0; i < comm_size; i++) {
252       (*recvcounts)[i] = std::stoi(action[i + 3]);
253     }
254     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
255   }
256 };
257
258 class ScatterArgParser : public CollCommParser {
259 public:
260   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
261   {
262     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
263           0 gather 68 68 0 0 0
264         where:
265           1) 68 is the sendcounts
266           2) 68 is the recvcounts
267           3) 0 is the root node
268           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
269           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
270     */
271     CHECK_ACTION_PARAMS(action, 2, 3)
272     comm_size   = MPI_COMM_WORLD->size();
273     send_size   = parse_double(action[2]);
274     recv_size   = parse_double(action[3]);
275     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
276     if (action.size() > 5)
277       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
278     if (action.size() > 6)
279       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
280   }
281 };
282
283 class ScatterVArgParser : public CollCommParser {
284 public:
285   int recv_size_sum;
286   int send_size_sum;
287   std::shared_ptr<std::vector<int>> sendcounts;
288   std::vector<int> disps;
289   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
290   {
291     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
292        0 gather 68 10 10 10 68 0 0 0
293         where:
294         1) 68 10 10 10 is the sendcounts
295         2) 68 is the recvcount
296         3) 0 is the root node
297         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
298         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
299     */
300     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
301     recv_size  = parse_double(action[2 + comm_size]);
302     disps      = std::vector<int>(comm_size, 0);
303     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
304
305     if (action.size() > 5 + comm_size)
306       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
307     if (action.size() > 5 + comm_size)
308       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
309
310     for (unsigned int i = 0; i < comm_size; i++) {
311       (*sendcounts)[i] = std::stoi(action[i + 2]);
312     }
313     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
314     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
315   }
316 };
317
318 class ReduceScatterArgParser : public CollCommParser {
319 public:
320   int recv_size_sum;
321   std::shared_ptr<std::vector<int>> recvcounts;
322   std::vector<int> disps;
323   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
324   {
325     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
326          0 reduceScatter 275427 275427 275427 204020 11346849 0
327        where:
328          1) The first four values after the name of the action declare the recvcounts array
329          2) The value 11346849 is the amount of instructions
330          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
331     */
332     comm_size = MPI_COMM_WORLD->size();
333     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
334     comp_size = parse_double(action[2+comm_size]);
335     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
336     if (action.size() > 3 + comm_size)
337       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
338
339     for (unsigned int i = 0; i < comm_size; i++) {
340       recvcounts->push_back(std::stoi(action[i + 2]));
341     }
342     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
343   }
344 };
345
346 class AllToAllVArgParser : public CollCommParser {
347 public:
348   int recv_size_sum;
349   int send_size_sum;
350   std::shared_ptr<std::vector<int>> recvcounts;
351   std::shared_ptr<std::vector<int>> sendcounts;
352   std::vector<int> senddisps;
353   std::vector<int> recvdisps;
354   int send_buf_size;
355   int recv_buf_size;
356   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
357   {
358     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
359           0 allToAllV 100 1 7 10 12 100 1 70 10 5
360        where:
361         1) 100 is the size of the send buffer *sizeof(int),
362         2) 1 7 10 12 is the sendcounts array
363         3) 100*sizeof(int) is the size of the receiver buffer
364         4)  1 70 10 5 is the recvcounts array
365     */
366     comm_size = MPI_COMM_WORLD->size();
367     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
368     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
370     senddisps  = std::vector<int>(comm_size, 0);
371     recvdisps  = std::vector<int>(comm_size, 0);
372
373     if (action.size() > 5 + 2 * comm_size)
374       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
375     if (action.size() > 5 + 2 * comm_size)
376       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
377
378     send_buf_size=parse_double(action[2]);
379     recv_buf_size=parse_double(action[3+comm_size]);
380     for (unsigned int i = 0; i < comm_size; i++) {
381       (*sendcounts)[i] = std::stoi(action[3 + i]);
382       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
383     }
384     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
385     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
386   }
387 };
388
389 template <class T> class ReplayAction {
390 protected:
391   const std::string name;
392   T args;
393
394   int my_proc_id;
395
396 public:
397   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
398
399   virtual void execute(simgrid::xbt::ReplayAction& action)
400   {
401     // Needs to be re-initialized for every action, hence here
402     double start_time = smpi_process()->simulated_elapsed();
403     args.parse(action, name);
404     kernel(action);
405     if (name != "Init")
406       log_timed_action(action, start_time);
407   }
408
409   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
410
411   void* send_buffer(int size)
412   {
413     return smpi_get_tmp_sendbuffer(size);
414   }
415
416   void* recv_buffer(int size)
417   {
418     return smpi_get_tmp_recvbuffer(size);
419   }
420 };
421
422 class WaitAction : public ReplayAction<ActionArgParser> {
423 public:
424   WaitAction() : ReplayAction("Wait") {}
425   void kernel(simgrid::xbt::ReplayAction& action) override
426   {
427     std::string s = boost::algorithm::join(action, " ");
428     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
429     MPI_Request request = get_reqq_self()->back();
430     get_reqq_self()->pop_back();
431
432     if (request == nullptr) {
433       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
434        * return.*/
435       return;
436     }
437
438     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
439
440     // Must be taken before Request::wait() since the request may be set to
441     // MPI_REQUEST_NULL by Request::wait!
442     int src                  = request->comm()->group()->rank(request->src());
443     int dst                  = request->comm()->group()->rank(request->dst());
444     bool is_wait_for_receive = (request->flags() & RECV);
445     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
446     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
447
448     MPI_Status status;
449     Request::wait(&request, &status);
450
451     TRACE_smpi_comm_out(rank);
452     if (is_wait_for_receive)
453       TRACE_smpi_recv(src, dst, 0);
454   }
455 };
456
457 class SendAction : public ReplayAction<SendRecvParser> {
458 public:
459   SendAction() = delete;
460   SendAction(std::string name) : ReplayAction(name) {}
461   void kernel(simgrid::xbt::ReplayAction& action) override
462   {
463     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
464
465     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
466                                                                              Datatype::encode(args.datatype1)));
467     if (not TRACE_smpi_view_internals())
468       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
469
470     if (name == "send") {
471       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
472     } else if (name == "Isend") {
473       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
474       get_reqq_self()->push_back(request);
475     } else {
476       xbt_die("Don't know this action, %s", name.c_str());
477     }
478
479     TRACE_smpi_comm_out(my_proc_id);
480   }
481 };
482
483 class RecvAction : public ReplayAction<SendRecvParser> {
484 public:
485   RecvAction() = delete;
486   explicit RecvAction(std::string name) : ReplayAction(name) {}
487   void kernel(simgrid::xbt::ReplayAction& action) override
488   {
489     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
490
491     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
492                                                                              Datatype::encode(args.datatype1)));
493
494     MPI_Status status;
495     // unknown size from the receiver point of view
496     if (args.size <= 0.0) {
497       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
498       args.size = status.count;
499     }
500
501     if (name == "recv") {
502       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
503     } else if (name == "Irecv") {
504       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
505       get_reqq_self()->push_back(request);
506     }
507
508     TRACE_smpi_comm_out(my_proc_id);
509     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
510     if (name == "recv" && not TRACE_smpi_view_internals()) {
511       TRACE_smpi_recv(src_traced, my_proc_id, 0);
512     }
513   }
514 };
515
516 class ComputeAction : public ReplayAction<ComputeParser> {
517 public:
518   ComputeAction() : ReplayAction("compute") {}
519   void kernel(simgrid::xbt::ReplayAction& action) override
520   {
521     TRACE_smpi_computing_in(my_proc_id, args.flops);
522     smpi_execute_flops(args.flops);
523     TRACE_smpi_computing_out(my_proc_id);
524   }
525 };
526
527 class TestAction : public ReplayAction<ActionArgParser> {
528 public:
529   TestAction() : ReplayAction("Test") {}
530   void kernel(simgrid::xbt::ReplayAction& action) override
531   {
532     MPI_Request request = get_reqq_self()->back();
533     get_reqq_self()->pop_back();
534     // if request is null here, this may mean that a previous test has succeeded
535     // Different times in traced application and replayed version may lead to this
536     // In this case, ignore the extra calls.
537     if (request != nullptr) {
538       TRACE_smpi_testing_in(my_proc_id);
539
540       MPI_Status status;
541       int flag = Request::test(&request, &status);
542
543       XBT_DEBUG("MPI_Test result: %d", flag);
544       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
545        * nullptr.*/
546       get_reqq_self()->push_back(request);
547
548       TRACE_smpi_testing_out(my_proc_id);
549     }
550   }
551 };
552
553 class InitAction : public ReplayAction<ActionArgParser> {
554 public:
555   InitAction() : ReplayAction("Init") {}
556   void kernel(simgrid::xbt::ReplayAction& action) override
557   {
558     CHECK_ACTION_PARAMS(action, 0, 1)
559     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
560                                            : MPI_BYTE;  // default TAU datatype
561
562     /* start a simulated timer */
563     smpi_process()->simulated_start();
564     /*initialize the number of active processes */
565     active_processes = smpi_process_count();
566
567     set_reqq_self(new std::vector<MPI_Request>);
568   }
569 };
570
571 class CommunicatorAction : public ReplayAction<ActionArgParser> {
572 public:
573   CommunicatorAction() : ReplayAction("Comm") {}
574   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
575 };
576
577 class WaitAllAction : public ReplayAction<ActionArgParser> {
578 public:
579   WaitAllAction() : ReplayAction("waitAll") {}
580   void kernel(simgrid::xbt::ReplayAction& action) override
581   {
582     const unsigned int count_requests = get_reqq_self()->size();
583
584     if (count_requests > 0) {
585       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
586       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
587       for (const auto& req : (*get_reqq_self())) {
588         if (req && (req->flags() & RECV)) {
589           sender_receiver.push_back({req->src(), req->dst()});
590         }
591       }
592       MPI_Status status[count_requests];
593       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
594
595       for (auto& pair : sender_receiver) {
596         TRACE_smpi_recv(pair.first, pair.second, 0);
597       }
598       TRACE_smpi_comm_out(my_proc_id);
599     }
600   }
601 };
602
603 class BarrierAction : public ReplayAction<ActionArgParser> {
604 public:
605   BarrierAction() : ReplayAction("barrier") {}
606   void kernel(simgrid::xbt::ReplayAction& action) override
607   {
608     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
609     Colls::barrier(MPI_COMM_WORLD);
610     TRACE_smpi_comm_out(my_proc_id);
611   }
612 };
613
614 class BcastAction : public ReplayAction<BcastArgParser> {
615 public:
616   BcastAction() : ReplayAction("bcast") {}
617   void kernel(simgrid::xbt::ReplayAction& action) override
618   {
619     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
620                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
621                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
622
623     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
624
625     TRACE_smpi_comm_out(my_proc_id);
626   }
627 };
628
629 class ReduceAction : public ReplayAction<ReduceArgParser> {
630 public:
631   ReduceAction() : ReplayAction("reduce") {}
632   void kernel(simgrid::xbt::ReplayAction& action) override
633   {
634     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
635                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
636                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
637
638     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
639         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
640     smpi_execute_flops(args.comp_size);
641
642     TRACE_smpi_comm_out(my_proc_id);
643   }
644 };
645
646 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
647 public:
648   AllReduceAction() : ReplayAction("allReduce") {}
649   void kernel(simgrid::xbt::ReplayAction& action) override
650   {
651     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
652                                                                                 Datatype::encode(args.datatype1), ""));
653
654     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
655         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
656     smpi_execute_flops(args.comp_size);
657
658     TRACE_smpi_comm_out(my_proc_id);
659   }
660 };
661
662 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
663 public:
664   AllToAllAction() : ReplayAction("allToAll") {}
665   void kernel(simgrid::xbt::ReplayAction& action) override
666   {
667     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
668                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
669                                                     Datatype::encode(args.datatype1),
670                                                     Datatype::encode(args.datatype2)));
671
672     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
673       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
674       args.recv_size, args.datatype2, MPI_COMM_WORLD);
675
676     TRACE_smpi_comm_out(my_proc_id);
677   }
678 };
679
680 class GatherAction : public ReplayAction<GatherArgParser> {
681 public:
682   GatherAction(std::string name) : ReplayAction(name) {}
683   void kernel(simgrid::xbt::ReplayAction& action) override
684   {
685     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
686                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
687
688     if (name == "gather") {
689       int rank = MPI_COMM_WORLD->rank();
690       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
692     }
693     else
694       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
696
697     TRACE_smpi_comm_out(my_proc_id);
698   }
699 };
700
701 class GatherVAction : public ReplayAction<GatherVArgParser> {
702 public:
703   GatherVAction(std::string name) : ReplayAction(name) {}
704   void kernel(simgrid::xbt::ReplayAction& action) override
705   {
706     int rank = MPI_COMM_WORLD->rank();
707
708     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
709                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
710                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
711
712     if (name == "gatherV") {
713       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
714                      (rank == args.root) ? recv_buffer(args.recv_size_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
715                      MPI_COMM_WORLD);
716     }
717     else {
718       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
719                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
720                     MPI_COMM_WORLD);
721     }
722
723     TRACE_smpi_comm_out(my_proc_id);
724   }
725 };
726
727 class ScatterAction : public ReplayAction<ScatterArgParser> {
728 public:
729   ScatterAction() : ReplayAction("scatter") {}
730   void kernel(simgrid::xbt::ReplayAction& action) override
731   {
732     int rank = MPI_COMM_WORLD->rank();
733     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
734                                                                           Datatype::encode(args.datatype1),
735                                                                           Datatype::encode(args.datatype2)));
736
737     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
738                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
739
740     TRACE_smpi_comm_out(my_proc_id);
741   }
742 };
743
744
745 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
746 public:
747   ScatterVAction() : ReplayAction("scatterV") {}
748   void kernel(simgrid::xbt::ReplayAction& action) override
749   {
750     int rank = MPI_COMM_WORLD->rank();
751     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
752           nullptr, Datatype::encode(args.datatype1),
753           Datatype::encode(args.datatype2)));
754
755     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(), 
756         args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
757         MPI_COMM_WORLD);
758
759     TRACE_smpi_comm_out(my_proc_id);
760   }
761 };
762
763 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
764 public:
765   ReduceScatterAction() : ReplayAction("reduceScatter") {}
766   void kernel(simgrid::xbt::ReplayAction& action) override
767   {
768     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
769                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
770                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
771                                                          Datatype::encode(args.datatype1)));
772
773     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()), recv_buffer(args.recv_size_sum * args.datatype1->size()), 
774                           args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
775
776     smpi_execute_flops(args.comp_size);
777     TRACE_smpi_comm_out(my_proc_id);
778   }
779 };
780
781 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
782 public:
783   AllToAllVAction() : ReplayAction("allToAllV") {}
784   void kernel(simgrid::xbt::ReplayAction& action) override
785   {
786     TRACE_smpi_comm_in(my_proc_id, __func__,
787                        new simgrid::instr::VarCollTIData(
788                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
789                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
790
791     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
792                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
793
794     TRACE_smpi_comm_out(my_proc_id);
795   }
796 };
797 } // Replay Namespace
798 }} // namespace simgrid::smpi
799
800 /** @brief Only initialize the replay, don't do it for real */
801 void smpi_replay_init(int* argc, char*** argv)
802 {
803   simgrid::smpi::Process::init(argc, argv);
804   smpi_process()->mark_as_initialized();
805   smpi_process()->set_replaying(true);
806
807   int my_proc_id = Actor::self()->getPid();
808   TRACE_smpi_init(my_proc_id);
809   TRACE_smpi_computing_init(my_proc_id);
810   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
811   TRACE_smpi_comm_out(my_proc_id);
812   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
813   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
814   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
815   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
816   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
817
818   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
819   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
820   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
821   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
822   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
823   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
824   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
825   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
826   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
827   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
828   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
829   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
830   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllVAction().execute(action); });
831   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
832   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
833   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
834   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
835   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
836   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
837   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
838   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
839
840   //if we have a delayed start, sleep here.
841   if(*argc>2){
842     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
843     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
844     smpi_execute_flops(value);
845   } else {
846     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
847     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
848     smpi_execute_flops(0.0);
849   }
850 }
851
852 /** @brief actually run the replay after initialization */
853 void smpi_replay_main(int* argc, char*** argv)
854 {
855   simgrid::xbt::replay_runner(*argc, *argv);
856
857   /* and now, finalize everything */
858   /* One active process will stop. Decrease the counter*/
859   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
860   if (not get_reqq_self()->empty()) {
861     unsigned int count_requests=get_reqq_self()->size();
862     MPI_Request requests[count_requests];
863     MPI_Status status[count_requests];
864     unsigned int i=0;
865
866     for (auto const& req : *get_reqq_self()) {
867       requests[i] = req;
868       i++;
869     }
870     simgrid::smpi::Request::waitall(count_requests, requests, status);
871   }
872   delete get_reqq_self();
873   active_processes--;
874
875   if(active_processes==0){
876     /* Last process alive speaking: end the simulated timer */
877     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
878     smpi_free_replay_tmp_buffers();
879   }
880
881   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
882
883   smpi_process()->finalize();
884
885   TRACE_smpi_comm_out(Actor::self()->getPid());
886   TRACE_smpi_finalize(Actor::self()->getPid());
887 }
888
889 /** @brief chain a replay initialization and a replay start */
890 void smpi_replay_run(int* argc, char*** argv)
891 {
892   smpi_replay_init(argc, argv);
893   smpi_replay_main(argc, argv);
894 }