Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fca1f30b37b976efe8bb9aec002cba15a04f9106
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherArgParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185
186     if (name == "gather") {
187       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188       if (action.size() > 5)
189         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190       if (action.size() > 6)
191         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
192     }
193     else {
194       if (action.size() > 4)
195         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196       if (action.size() > 5)
197         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
198     }
199   }
200 };
201
202 class GatherVArgParser : public CollCommParser {
203 public:
204   int recv_sum;
205   std::shared_ptr<std::vector<int>> recvcounts;
206   std::vector<int> disps;
207   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208   {
209     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210          0 gather 68 68 10 10 10 0 0 0
211        where:
212          1) 68 is the sendcount
213          2) 68 10 10 10 is the recvcounts
214          3) 0 is the root node
215          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217     */
218     comm_size = MPI_COMM_WORLD->size();
219     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220     send_size = parse_double(action[2]);
221     disps     = std::vector<int>(comm_size, 0);
222     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223
224     if (name == "gatherV") {
225       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226       if (action.size() > 4 + comm_size)
227         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228       if (action.size() > 5 + comm_size)
229         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
230     }
231     else {
232       int datatype_index = 0, disp_index = 0;
233       if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
234         datatype_index = 3 + comm_size;
235         disp_index     = datatype_index + 1;
236       } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
237         datatype_index = -1;
238         disp_index     = 3 + comm_size;
239       } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
240         datatype_index = 3 + comm_size;
241       }
242
243       if (disp_index != 0) {
244         for (unsigned int i = 0; i < comm_size; i++)
245           disps[i]          = std::stoi(action[disp_index + i]);
246       }
247
248       datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249       datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250     }
251
252     for (unsigned int i = 0; i < comm_size; i++) {
253       (*recvcounts)[i] = std::stoi(action[i + 3]);
254     }
255     recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
256   }
257 };
258
259 class ScatterArgParser : public CollCommParser {
260 public:
261   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
262   {
263     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
264           0 gather 68 68 0 0 0
265         where:
266           1) 68 is the sendcounts
267           2) 68 is the recvcounts
268           3) 0 is the root node
269           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
270           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
271     */
272     CHECK_ACTION_PARAMS(action, 2, 3)
273     comm_size   = MPI_COMM_WORLD->size();
274     send_size   = parse_double(action[2]);
275     recv_size   = parse_double(action[3]);
276     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
277     if (action.size() > 5)
278       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
279     if (action.size() > 6)
280       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
281   }
282 };
283
284 class ScatterVArgParser : public CollCommParser {
285 public:
286   int recv_sum;
287   int send_sum;
288   std::shared_ptr<std::vector<int>> sendcounts;
289   std::vector<int> disps;
290   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
291   {
292     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
293        0 gather 68 10 10 10 68 0 0 0
294         where:
295         1) 68 10 10 10 is the sendcounts
296         2) 68 is the recvcount
297         3) 0 is the root node
298         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
299         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
300     */
301     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
302     recv_size  = parse_double(action[2 + comm_size]);
303     disps      = std::vector<int>(comm_size, 0);
304     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
305
306     if (action.size() > 5 + comm_size)
307       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
308     if (action.size() > 5 + comm_size)
309       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
310
311     for (unsigned int i = 0; i < comm_size; i++) {
312       (*sendcounts)[i] = std::stoi(action[i + 2]);
313     }
314     send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
315     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
316   }
317 };
318
319 class ReduceScatterArgParser : public CollCommParser {
320 public:
321   int recv_sum;
322   std::shared_ptr<std::vector<int>> recvcounts;
323   std::vector<int> disps;
324   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
325   {
326     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
327          0 reduceScatter 275427 275427 275427 204020 11346849 0
328        where:
329          1) The first four values after the name of the action declare the recvcounts array
330          2) The value 11346849 is the amount of instructions
331          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
332     */
333     comm_size = MPI_COMM_WORLD->size();
334     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
335     comp_size = parse_double(action[2+comm_size]);
336     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
337     if (action.size() > 3 + comm_size)
338       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
339
340     for (unsigned int i = 0; i < comm_size; i++) {
341       recvcounts->push_back(std::stoi(action[i + 2]));
342     }
343     recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
344   }
345 };
346
347 template <class T> class ReplayAction {
348 protected:
349   const std::string name;
350   T args;
351
352   int my_proc_id;
353
354 public:
355   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
356
357   virtual void execute(simgrid::xbt::ReplayAction& action)
358   {
359     // Needs to be re-initialized for every action, hence here
360     double start_time = smpi_process()->simulated_elapsed();
361     args.parse(action, name);
362     kernel(action);
363     if (name != "Init")
364       log_timed_action(action, start_time);
365   }
366
367   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
368
369   void* send_buffer(int size)
370   {
371     return smpi_get_tmp_sendbuffer(size);
372   }
373
374   void* recv_buffer(int size)
375   {
376     return smpi_get_tmp_recvbuffer(size);
377   }
378 };
379
380 class WaitAction : public ReplayAction<ActionArgParser> {
381 public:
382   WaitAction() : ReplayAction("Wait") {}
383   void kernel(simgrid::xbt::ReplayAction& action) override
384   {
385     std::string s = boost::algorithm::join(action, " ");
386     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
387     MPI_Request request = get_reqq_self()->back();
388     get_reqq_self()->pop_back();
389
390     if (request == nullptr) {
391       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
392        * return.*/
393       return;
394     }
395
396     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
397
398     // Must be taken before Request::wait() since the request may be set to
399     // MPI_REQUEST_NULL by Request::wait!
400     int src                  = request->comm()->group()->rank(request->src());
401     int dst                  = request->comm()->group()->rank(request->dst());
402     bool is_wait_for_receive = (request->flags() & RECV);
403     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
404     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
405
406     MPI_Status status;
407     Request::wait(&request, &status);
408
409     TRACE_smpi_comm_out(rank);
410     if (is_wait_for_receive)
411       TRACE_smpi_recv(src, dst, 0);
412   }
413 };
414
415 class SendAction : public ReplayAction<SendRecvParser> {
416 public:
417   SendAction() = delete;
418   SendAction(std::string name) : ReplayAction(name) {}
419   void kernel(simgrid::xbt::ReplayAction& action) override
420   {
421     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
422
423     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
424                                                                                  Datatype::encode(args.datatype1)));
425     if (not TRACE_smpi_view_internals())
426       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
427
428     if (name == "send") {
429       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
430     } else if (name == "Isend") {
431       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
432       get_reqq_self()->push_back(request);
433     } else {
434       xbt_die("Don't know this action, %s", name.c_str());
435     }
436
437     TRACE_smpi_comm_out(my_proc_id);
438   }
439 };
440
441 class RecvAction : public ReplayAction<SendRecvParser> {
442 public:
443   RecvAction() = delete;
444   explicit RecvAction(std::string name) : ReplayAction(name) {}
445   void kernel(simgrid::xbt::ReplayAction& action) override
446   {
447     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
448
449     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
450                                                                                  Datatype::encode(args.datatype1)));
451
452     MPI_Status status;
453     // unknown size from the receiver point of view
454     if (args.size <= 0.0) {
455       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
456       args.size = status.count;
457     }
458
459     if (name == "recv") {
460       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
461     } else if (name == "Irecv") {
462       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
463       get_reqq_self()->push_back(request);
464     }
465
466     TRACE_smpi_comm_out(my_proc_id);
467     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
468     if (name == "recv" && not TRACE_smpi_view_internals()) {
469       TRACE_smpi_recv(src_traced, my_proc_id, 0);
470     }
471   }
472 };
473
474 class ComputeAction : public ReplayAction<ComputeParser> {
475 public:
476   ComputeAction() : ReplayAction("compute") {}
477   void kernel(simgrid::xbt::ReplayAction& action) override
478   {
479     TRACE_smpi_computing_in(my_proc_id, args.flops);
480     smpi_execute_flops(args.flops);
481     TRACE_smpi_computing_out(my_proc_id);
482   }
483 };
484
485 class TestAction : public ReplayAction<ActionArgParser> {
486 public:
487   TestAction() : ReplayAction("Test") {}
488   void kernel(simgrid::xbt::ReplayAction& action) override
489   {
490     MPI_Request request = get_reqq_self()->back();
491     get_reqq_self()->pop_back();
492     // if request is null here, this may mean that a previous test has succeeded
493     // Different times in traced application and replayed version may lead to this
494     // In this case, ignore the extra calls.
495     if (request != nullptr) {
496       TRACE_smpi_testing_in(my_proc_id);
497
498       MPI_Status status;
499       int flag = Request::test(&request, &status);
500
501       XBT_DEBUG("MPI_Test result: %d", flag);
502       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
503        * nullptr.*/
504       get_reqq_self()->push_back(request);
505
506       TRACE_smpi_testing_out(my_proc_id);
507     }
508   }
509 };
510
511 class InitAction : public ReplayAction<ActionArgParser> {
512 public:
513   InitAction() : ReplayAction("Init") {}
514   void kernel(simgrid::xbt::ReplayAction& action) override
515   {
516     CHECK_ACTION_PARAMS(action, 0, 1)
517     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
518                                            : MPI_BYTE;  // default TAU datatype
519
520     /* start a simulated timer */
521     smpi_process()->simulated_start();
522     /*initialize the number of active processes */
523     active_processes = smpi_process_count();
524
525     set_reqq_self(new std::vector<MPI_Request>);
526   }
527 };
528
529 class CommunicatorAction : public ReplayAction<ActionArgParser> {
530 public:
531   CommunicatorAction() : ReplayAction("Comm") {}
532   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
533 };
534
535 class WaitAllAction : public ReplayAction<ActionArgParser> {
536 public:
537   WaitAllAction() : ReplayAction("waitAll") {}
538   void kernel(simgrid::xbt::ReplayAction& action) override
539   {
540     const unsigned int count_requests = get_reqq_self()->size();
541
542     if (count_requests > 0) {
543       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
544                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
545       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
546       for (const auto& req : (*get_reqq_self())) {
547         if (req && (req->flags() & RECV)) {
548           sender_receiver.push_back({req->src(), req->dst()});
549         }
550       }
551       MPI_Status status[count_requests];
552       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
553
554       for (auto& pair : sender_receiver) {
555         TRACE_smpi_recv(pair.first, pair.second, 0);
556       }
557       TRACE_smpi_comm_out(my_proc_id);
558     }
559   }
560 };
561
562 class BarrierAction : public ReplayAction<ActionArgParser> {
563 public:
564   BarrierAction() : ReplayAction("barrier") {}
565   void kernel(simgrid::xbt::ReplayAction& action) override
566   {
567     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
568     Colls::barrier(MPI_COMM_WORLD);
569     TRACE_smpi_comm_out(my_proc_id);
570   }
571 };
572
573 class BcastAction : public ReplayAction<BcastArgParser> {
574 public:
575   BcastAction() : ReplayAction("bcast") {}
576   void kernel(simgrid::xbt::ReplayAction& action) override
577   {
578     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
579                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
580                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
581
582     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
583
584     TRACE_smpi_comm_out(my_proc_id);
585   }
586 };
587
588 class ReduceAction : public ReplayAction<ReduceArgParser> {
589 public:
590   ReduceAction() : ReplayAction("reduce") {}
591   void kernel(simgrid::xbt::ReplayAction& action) override
592   {
593     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
594                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
595                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
596
597     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
598         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
599     smpi_execute_flops(args.comp_size);
600
601     TRACE_smpi_comm_out(my_proc_id);
602   }
603 };
604
605 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
606 public:
607   AllReduceAction() : ReplayAction("allReduce") {}
608   void kernel(simgrid::xbt::ReplayAction& action) override
609   {
610     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
611                                                                                 Datatype::encode(args.datatype1), ""));
612
613     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
614         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
615     smpi_execute_flops(args.comp_size);
616
617     TRACE_smpi_comm_out(my_proc_id);
618   }
619 };
620
621 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
622 public:
623   AllToAllAction() : ReplayAction("allToAll") {}
624   void kernel(simgrid::xbt::ReplayAction& action) override
625   {
626     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
627                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
628                                                     Datatype::encode(args.datatype1),
629                                                     Datatype::encode(args.datatype2)));
630
631     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
632       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
633       args.recv_size, args.datatype2, MPI_COMM_WORLD);
634
635     TRACE_smpi_comm_out(my_proc_id);
636   }
637 };
638
639 class GatherAction : public ReplayAction<GatherArgParser> {
640 public:
641   GatherAction(std::string name) : ReplayAction(name) {}
642   void kernel(simgrid::xbt::ReplayAction& action) override
643   {
644     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
645                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
646
647     if (name == "gather") {
648       int rank = MPI_COMM_WORLD->rank();
649       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
650                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
651     }
652     else
653       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
654                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
655
656     TRACE_smpi_comm_out(my_proc_id);
657   }
658 };
659
660 class GatherVAction : public ReplayAction<GatherVArgParser> {
661 public:
662   GatherVAction(std::string name) : ReplayAction(name) {}
663   void kernel(simgrid::xbt::ReplayAction& action) override
664   {
665     int rank = MPI_COMM_WORLD->rank();
666
667     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
668                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
669                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
670
671     if (name == "gatherV") {
672       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
673                      (rank == args.root) ? recv_buffer(args.recv_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
674                      MPI_COMM_WORLD);
675     }
676     else {
677       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
678                         recv_buffer(args.recv_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
679                     MPI_COMM_WORLD);
680     }
681
682     TRACE_smpi_comm_out(my_proc_id);
683   }
684 };
685
686 class ScatterAction : public ReplayAction<ScatterArgParser> {
687 public:
688   ScatterAction() : ReplayAction("scatter") {}
689   void kernel(simgrid::xbt::ReplayAction& action) override
690   {
691     int rank = MPI_COMM_WORLD->rank();
692     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
693                                                                           Datatype::encode(args.datatype1),
694                                                                           Datatype::encode(args.datatype2)));
695
696     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
698
699     TRACE_smpi_comm_out(my_proc_id);
700   }
701 };
702
703
704 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
705 public:
706   ScatterVAction() : ReplayAction("scatterV") {}
707   void kernel(simgrid::xbt::ReplayAction& action) override
708   {
709     int rank = MPI_COMM_WORLD->rank();
710     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
711           nullptr, Datatype::encode(args.datatype1),
712           Datatype::encode(args.datatype2)));
713
714     Colls::scatterv((rank == args.root) ? send_buffer(args.send_sum * args.datatype1->size()) : nullptr, args.sendcounts->data(), args.disps.data(), 
715         args.datatype1, recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
716         MPI_COMM_WORLD);
717
718     TRACE_smpi_comm_out(my_proc_id);
719   }
720 };
721
722 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
723 public:
724   ReduceScatterAction() : ReplayAction("reduceScatter") {}
725   void kernel(simgrid::xbt::ReplayAction& action) override
726   {
727     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
728                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
729                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
730                                                          Datatype::encode(args.datatype1)));
731
732     Colls::reduce_scatter(send_buffer(args.recv_sum * args.datatype1->size()), recv_buffer(args.recv_sum * args.datatype1->size()), 
733                           args.recvcounts->data(), args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
734
735     smpi_execute_flops(args.comp_size);
736     TRACE_smpi_comm_out(my_proc_id);
737   }
738 };
739 } // Replay Namespace
740
741 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
742 {
743   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
744         0 allToAllV 100 1 7 10 12 100 1 70 10 5
745      where:
746         1) 100 is the size of the send buffer *sizeof(int),
747         2) 1 7 10 12 is the sendcounts array
748         3) 100*sizeof(int) is the size of the receiver buffer
749         4)  1 70 10 5 is the recvcounts array
750   */
751   double clock = smpi_process()->simulated_elapsed();
752
753   unsigned long comm_size = MPI_COMM_WORLD->size();
754   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
755   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
756   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
757   std::vector<int> senddisps(comm_size, 0);
758   std::vector<int> recvdisps(comm_size, 0);
759
760   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
761                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
762                                       : MPI_DEFAULT_TYPE;
763   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
764                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
765                                      : MPI_DEFAULT_TYPE};
766
767   int send_buf_size=parse_double(action[2]);
768   int recv_buf_size=parse_double(action[3+comm_size]);
769   int my_proc_id = Actor::self()->getPid();
770   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
771   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
772
773   for (unsigned int i = 0; i < comm_size; i++) {
774     (*sendcounts)[i] = std::stoi(action[3 + i]);
775     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
776   }
777   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
778   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
779
780   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
781                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
782                                                        Datatype::encode(MPI_CURRENT_TYPE),
783                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
784
785   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
786                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
787
788   TRACE_smpi_comm_out(my_proc_id);
789   log_timed_action (action, clock);
790 }
791
792 }} // namespace simgrid::smpi
793
794 /** @brief Only initialize the replay, don't do it for real */
795 void smpi_replay_init(int* argc, char*** argv)
796 {
797   simgrid::smpi::Process::init(argc, argv);
798   smpi_process()->mark_as_initialized();
799   smpi_process()->set_replaying(true);
800
801   int my_proc_id = Actor::self()->getPid();
802   TRACE_smpi_init(my_proc_id);
803   TRACE_smpi_computing_init(my_proc_id);
804   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
805   TRACE_smpi_comm_out(my_proc_id);
806   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
807   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
808   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
809   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
810   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
811
812   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
813   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
814   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
815   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
816   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
817   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
818   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
819   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
820   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
821   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
822   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
823   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
824   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
825   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
826   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
827   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
828   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterVAction().execute(action); });
829   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
830   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
831   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceScatterAction().execute(action); });
832   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
833
834   //if we have a delayed start, sleep here.
835   if(*argc>2){
836     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
837     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
838     smpi_execute_flops(value);
839   } else {
840     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
841     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
842     smpi_execute_flops(0.0);
843   }
844 }
845
846 /** @brief actually run the replay after initialization */
847 void smpi_replay_main(int* argc, char*** argv)
848 {
849   simgrid::xbt::replay_runner(*argc, *argv);
850
851   /* and now, finalize everything */
852   /* One active process will stop. Decrease the counter*/
853   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
854   if (not get_reqq_self()->empty()) {
855     unsigned int count_requests=get_reqq_self()->size();
856     MPI_Request requests[count_requests];
857     MPI_Status status[count_requests];
858     unsigned int i=0;
859
860     for (auto const& req : *get_reqq_self()) {
861       requests[i] = req;
862       i++;
863     }
864     simgrid::smpi::Request::waitall(count_requests, requests, status);
865   }
866   delete get_reqq_self();
867   active_processes--;
868
869   if(active_processes==0){
870     /* Last process alive speaking: end the simulated timer */
871     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
872     smpi_free_replay_tmp_buffers();
873   }
874
875   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
876
877   smpi_process()->finalize();
878
879   TRACE_smpi_comm_out(Actor::self()->getPid());
880   TRACE_smpi_finalize(Actor::self()->getPid());
881 }
882
883 /** @brief chain a replay initialization and a replay start */
884 void smpi_replay_run(int* argc, char*** argv)
885 {
886   smpi_replay_init(argc, argv);
887   smpi_replay_main(argc, argv);
888 }