Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
58af69b116075fdcb4f59a251715edebe8196616
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherArgParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185
186     if (name == "gather") {
187       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188       if (action.size() > 5)
189         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190       if (action.size() > 6)
191         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
192     }
193     else {
194       if (action.size() > 4)
195         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196       if (action.size() > 5)
197         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
198     }
199   }
200 };
201
202 class GatherVArgParser : public CollCommParser {
203 public:
204   int recv_sum;
205   std::shared_ptr<std::vector<int>> recvcounts;
206   std::vector<int> disps;
207   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
208   {
209     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
210          0 gather 68 68 10 10 10 0 0 0
211        where:
212          1) 68 is the sendcount
213          2) 68 10 10 10 is the recvcounts
214          3) 0 is the root node
215          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217     */
218     comm_size = MPI_COMM_WORLD->size();
219     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
220     send_size = parse_double(action[2]);
221     disps     = std::vector<int>(comm_size, 0);
222     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
223
224     if (name == "gatherV") {
225       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
226       if (action.size() > 4 + comm_size)
227         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
228       if (action.size() > 5 + comm_size)
229         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
230     }
231     else {
232       int datatype_index = 0, disp_index = 0;
233       if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
234         datatype_index = 3 + comm_size;
235         disp_index     = datatype_index + 1;
236       } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
237         datatype_index = -1;
238         disp_index     = 3 + comm_size;
239       } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
240         datatype_index = 3 + comm_size;
241       }
242
243       if (disp_index != 0) {
244         for (unsigned int i = 0; i < comm_size; i++)
245           disps[i]          = std::stoi(action[disp_index + i]);
246       }
247
248       datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
249       datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
250     }
251
252     for (unsigned int i = 0; i < comm_size; i++) {
253       (*recvcounts)[i] = std::stoi(action[i + 3]);
254     }
255     recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
256   }
257 };
258
259 class ScatterArgParser : public CollCommParser {
260 public:
261   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
262   {
263     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
264           0 gather 68 68 0 0 0
265         where:
266           1) 68 is the sendcounts
267           2) 68 is the recvcounts
268           3) 0 is the root node
269           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
270           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
271     */
272     CHECK_ACTION_PARAMS(action, 2, 3)
273     comm_size   = MPI_COMM_WORLD->size();
274     send_size   = parse_double(action[2]);
275     recv_size   = parse_double(action[3]);
276     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
277     if (action.size() > 5)
278       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
279     if (action.size() > 6)
280       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
281   }
282 };
283
284 template <class T> class ReplayAction {
285 protected:
286   const std::string name;
287   T args;
288
289   int my_proc_id;
290
291 public:
292   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
293
294   virtual void execute(simgrid::xbt::ReplayAction& action)
295   {
296     // Needs to be re-initialized for every action, hence here
297     double start_time = smpi_process()->simulated_elapsed();
298     args.parse(action, name);
299     kernel(action);
300     if (name != "Init")
301       log_timed_action(action, start_time);
302   }
303
304   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
305
306   void* send_buffer(int size)
307   {
308     return smpi_get_tmp_sendbuffer(size);
309   }
310
311   void* recv_buffer(int size)
312   {
313     return smpi_get_tmp_recvbuffer(size);
314   }
315 };
316
317 class WaitAction : public ReplayAction<ActionArgParser> {
318 public:
319   WaitAction() : ReplayAction("Wait") {}
320   void kernel(simgrid::xbt::ReplayAction& action) override
321   {
322     std::string s = boost::algorithm::join(action, " ");
323     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
324     MPI_Request request = get_reqq_self()->back();
325     get_reqq_self()->pop_back();
326
327     if (request == nullptr) {
328       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
329        * return.*/
330       return;
331     }
332
333     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
334
335     // Must be taken before Request::wait() since the request may be set to
336     // MPI_REQUEST_NULL by Request::wait!
337     int src                  = request->comm()->group()->rank(request->src());
338     int dst                  = request->comm()->group()->rank(request->dst());
339     bool is_wait_for_receive = (request->flags() & RECV);
340     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
341     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
342
343     MPI_Status status;
344     Request::wait(&request, &status);
345
346     TRACE_smpi_comm_out(rank);
347     if (is_wait_for_receive)
348       TRACE_smpi_recv(src, dst, 0);
349   }
350 };
351
352 class SendAction : public ReplayAction<SendRecvParser> {
353 public:
354   SendAction() = delete;
355   SendAction(std::string name) : ReplayAction(name) {}
356   void kernel(simgrid::xbt::ReplayAction& action) override
357   {
358     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
359
360     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
361                                                                                  Datatype::encode(args.datatype1)));
362     if (not TRACE_smpi_view_internals())
363       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
364
365     if (name == "send") {
366       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
367     } else if (name == "Isend") {
368       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
369       get_reqq_self()->push_back(request);
370     } else {
371       xbt_die("Don't know this action, %s", name.c_str());
372     }
373
374     TRACE_smpi_comm_out(my_proc_id);
375   }
376 };
377
378 class RecvAction : public ReplayAction<SendRecvParser> {
379 public:
380   RecvAction() = delete;
381   explicit RecvAction(std::string name) : ReplayAction(name) {}
382   void kernel(simgrid::xbt::ReplayAction& action) override
383   {
384     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
385
386     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
387                                                                                  Datatype::encode(args.datatype1)));
388
389     MPI_Status status;
390     // unknown size from the receiver point of view
391     if (args.size <= 0.0) {
392       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
393       args.size = status.count;
394     }
395
396     if (name == "recv") {
397       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
398     } else if (name == "Irecv") {
399       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
400       get_reqq_self()->push_back(request);
401     }
402
403     TRACE_smpi_comm_out(my_proc_id);
404     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
405     if (name == "recv" && not TRACE_smpi_view_internals()) {
406       TRACE_smpi_recv(src_traced, my_proc_id, 0);
407     }
408   }
409 };
410
411 class ComputeAction : public ReplayAction<ComputeParser> {
412 public:
413   ComputeAction() : ReplayAction("compute") {}
414   void kernel(simgrid::xbt::ReplayAction& action) override
415   {
416     TRACE_smpi_computing_in(my_proc_id, args.flops);
417     smpi_execute_flops(args.flops);
418     TRACE_smpi_computing_out(my_proc_id);
419   }
420 };
421
422 class TestAction : public ReplayAction<ActionArgParser> {
423 public:
424   TestAction() : ReplayAction("Test") {}
425   void kernel(simgrid::xbt::ReplayAction& action) override
426   {
427     MPI_Request request = get_reqq_self()->back();
428     get_reqq_self()->pop_back();
429     // if request is null here, this may mean that a previous test has succeeded
430     // Different times in traced application and replayed version may lead to this
431     // In this case, ignore the extra calls.
432     if (request != nullptr) {
433       TRACE_smpi_testing_in(my_proc_id);
434
435       MPI_Status status;
436       int flag = Request::test(&request, &status);
437
438       XBT_DEBUG("MPI_Test result: %d", flag);
439       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
440        * nullptr.*/
441       get_reqq_self()->push_back(request);
442
443       TRACE_smpi_testing_out(my_proc_id);
444     }
445   }
446 };
447
448 class InitAction : public ReplayAction<ActionArgParser> {
449 public:
450   InitAction() : ReplayAction("Init") {}
451   void kernel(simgrid::xbt::ReplayAction& action) override
452   {
453     CHECK_ACTION_PARAMS(action, 0, 1)
454     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
455                                            : MPI_BYTE;  // default TAU datatype
456
457     /* start a simulated timer */
458     smpi_process()->simulated_start();
459     /*initialize the number of active processes */
460     active_processes = smpi_process_count();
461
462     set_reqq_self(new std::vector<MPI_Request>);
463   }
464 };
465
466 class CommunicatorAction : public ReplayAction<ActionArgParser> {
467 public:
468   CommunicatorAction() : ReplayAction("Comm") {}
469   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
470 };
471
472 class WaitAllAction : public ReplayAction<ActionArgParser> {
473 public:
474   WaitAllAction() : ReplayAction("waitAll") {}
475   void kernel(simgrid::xbt::ReplayAction& action) override
476   {
477     const unsigned int count_requests = get_reqq_self()->size();
478
479     if (count_requests > 0) {
480       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
481                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
482       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
483       for (const auto& req : (*get_reqq_self())) {
484         if (req && (req->flags() & RECV)) {
485           sender_receiver.push_back({req->src(), req->dst()});
486         }
487       }
488       MPI_Status status[count_requests];
489       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
490
491       for (auto& pair : sender_receiver) {
492         TRACE_smpi_recv(pair.first, pair.second, 0);
493       }
494       TRACE_smpi_comm_out(my_proc_id);
495     }
496   }
497 };
498
499 class BarrierAction : public ReplayAction<ActionArgParser> {
500 public:
501   BarrierAction() : ReplayAction("barrier") {}
502   void kernel(simgrid::xbt::ReplayAction& action) override
503   {
504     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
505     Colls::barrier(MPI_COMM_WORLD);
506     TRACE_smpi_comm_out(my_proc_id);
507   }
508 };
509
510 class BcastAction : public ReplayAction<BcastArgParser> {
511 public:
512   BcastAction() : ReplayAction("bcast") {}
513   void kernel(simgrid::xbt::ReplayAction& action) override
514   {
515     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
516                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
517                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
518
519     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
520
521     TRACE_smpi_comm_out(my_proc_id);
522   }
523 };
524
525 class ReduceAction : public ReplayAction<ReduceArgParser> {
526 public:
527   ReduceAction() : ReplayAction("reduce") {}
528   void kernel(simgrid::xbt::ReplayAction& action) override
529   {
530     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
531                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
532                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
533
534     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
535         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
536     smpi_execute_flops(args.comp_size);
537
538     TRACE_smpi_comm_out(my_proc_id);
539   }
540 };
541
542 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
543 public:
544   AllReduceAction() : ReplayAction("allReduce") {}
545   void kernel(simgrid::xbt::ReplayAction& action) override
546   {
547     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
548                                                                                 Datatype::encode(args.datatype1), ""));
549
550     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
551         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
552     smpi_execute_flops(args.comp_size);
553
554     TRACE_smpi_comm_out(my_proc_id);
555   }
556 };
557
558 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
559 public:
560   AllToAllAction() : ReplayAction("allToAll") {}
561   void kernel(simgrid::xbt::ReplayAction& action) override
562   {
563     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
564                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
565                                                     Datatype::encode(args.datatype1),
566                                                     Datatype::encode(args.datatype2)));
567
568     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
569       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
570       args.recv_size, args.datatype2, MPI_COMM_WORLD);
571
572     TRACE_smpi_comm_out(my_proc_id);
573   }
574 };
575
576 class GatherAction : public ReplayAction<GatherArgParser> {
577 public:
578   GatherAction(std::string name) : ReplayAction(name) {}
579   void kernel(simgrid::xbt::ReplayAction& action) override
580   {
581     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
582                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
583
584     if (name == "gather") {
585       int rank = MPI_COMM_WORLD->rank();
586       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
587                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
588     }
589     else
590       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
591                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
592
593     TRACE_smpi_comm_out(my_proc_id);
594   }
595 };
596
597 class GatherVAction : public ReplayAction<GatherVArgParser> {
598 public:
599   GatherVAction(std::string name) : ReplayAction(name) {}
600   void kernel(simgrid::xbt::ReplayAction& action) override
601   {
602     int rank = MPI_COMM_WORLD->rank();
603
604     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
605                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
606                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
607
608     if (name == "gatherV") {
609       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
610                      (rank == args.root) ? recv_buffer(args.recv_sum  * args.datatype2->size()) : nullptr, args.recvcounts->data(), args.disps.data(), args.datatype2, args.root,
611                      MPI_COMM_WORLD);
612     }
613     else {
614       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1, 
615                         recv_buffer(args.recv_sum * args.datatype2->size()), args.recvcounts->data(), args.disps.data(), args.datatype2,
616                     MPI_COMM_WORLD);
617     }
618
619     TRACE_smpi_comm_out(my_proc_id);
620   }
621 };
622
623 class ScatterAction : public ReplayAction<ScatterArgParser> {
624 public:
625   ScatterAction() : ReplayAction("scatter") {}
626   void kernel(simgrid::xbt::ReplayAction& action) override
627   {
628     int rank = MPI_COMM_WORLD->rank();
629     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
630                                                                           Datatype::encode(args.datatype1),
631                                                                           Datatype::encode(args.datatype2)));
632
633     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
634                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
635
636     TRACE_smpi_comm_out(my_proc_id);
637   }
638 };
639 } // Replay Namespace
640
641 static void action_scatterv(simgrid::xbt::ReplayAction& action)
642 {
643   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
644        0 gather 68 10 10 10 68 0 0 0
645      where:
646        1) 68 10 10 10 is the sendcounts
647        2) 68 is the recvcount
648        3) 0 is the root node
649        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
650        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
651   */
652   double clock  = smpi_process()->simulated_elapsed();
653   unsigned long comm_size = MPI_COMM_WORLD->size();
654   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
655   int recv_size = parse_double(action[2 + comm_size]);
656   std::vector<int> disps(comm_size, 0);
657   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
658
659   MPI_Datatype MPI_CURRENT_TYPE =
660       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
661   MPI_Datatype MPI_CURRENT_TYPE2{
662       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
663
664   void* send = nullptr;
665   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
666   for (unsigned int i = 0; i < comm_size; i++) {
667     (*sendcounts)[i] = std::stoi(action[i + 2]);
668   }
669   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
670
671   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
672   int rank = MPI_COMM_WORLD->rank();
673
674   if (rank == root)
675     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
676
677   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
678                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
679                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
680
681   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
682                   MPI_COMM_WORLD);
683
684   TRACE_smpi_comm_out(Actor::self()->getPid());
685   log_timed_action(action, clock);
686 }
687
688 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
689 {
690   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
691        0 reduceScatter 275427 275427 275427 204020 11346849 0
692      where:
693        1) The first four values after the name of the action declare the recvcounts array
694        2) The value 11346849 is the amount of instructions
695        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
696  */
697   double clock = smpi_process()->simulated_elapsed();
698   unsigned long comm_size = MPI_COMM_WORLD->size();
699   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
700   int comp_size = parse_double(action[2+comm_size]);
701   int my_proc_id                     = Actor::self()->getPid();
702   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
703   MPI_Datatype MPI_CURRENT_TYPE =
704       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
705
706   for (unsigned int i = 0; i < comm_size; i++) {
707     recvcounts->push_back(std::stoi(action[i + 2]));
708   }
709   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
710
711   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
712                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
713                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
714                                                        Datatype::encode(MPI_CURRENT_TYPE)));
715
716   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
717   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
718
719   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
720   smpi_execute_flops(comp_size);
721
722   TRACE_smpi_comm_out(my_proc_id);
723   log_timed_action (action, clock);
724 }
725
726 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
727 {
728   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
729         0 allToAllV 100 1 7 10 12 100 1 70 10 5
730      where:
731         1) 100 is the size of the send buffer *sizeof(int),
732         2) 1 7 10 12 is the sendcounts array
733         3) 100*sizeof(int) is the size of the receiver buffer
734         4)  1 70 10 5 is the recvcounts array
735   */
736   double clock = smpi_process()->simulated_elapsed();
737
738   unsigned long comm_size = MPI_COMM_WORLD->size();
739   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
740   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
741   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
742   std::vector<int> senddisps(comm_size, 0);
743   std::vector<int> recvdisps(comm_size, 0);
744
745   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
746                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
747                                       : MPI_DEFAULT_TYPE;
748   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
749                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
750                                      : MPI_DEFAULT_TYPE};
751
752   int send_buf_size=parse_double(action[2]);
753   int recv_buf_size=parse_double(action[3+comm_size]);
754   int my_proc_id = Actor::self()->getPid();
755   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
757
758   for (unsigned int i = 0; i < comm_size; i++) {
759     (*sendcounts)[i] = std::stoi(action[3 + i]);
760     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
761   }
762   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
764
765   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767                                                        Datatype::encode(MPI_CURRENT_TYPE),
768                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
769
770   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
772
773   TRACE_smpi_comm_out(my_proc_id);
774   log_timed_action (action, clock);
775 }
776
777 }} // namespace simgrid::smpi
778
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
781 {
782   simgrid::smpi::Process::init(argc, argv);
783   smpi_process()->mark_as_initialized();
784   smpi_process()->set_replaying(true);
785
786   int my_proc_id = Actor::self()->getPid();
787   TRACE_smpi_init(my_proc_id);
788   TRACE_smpi_computing_init(my_proc_id);
789   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790   TRACE_smpi_comm_out(my_proc_id);
791   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
792   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
793   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
794   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
795   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
796
797   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
798   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
799   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
800   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
801   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
802   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
803   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
804   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
805   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
806   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
807   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
808   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
809   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
810   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
811   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ScatterAction().execute(action); });
812   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("gatherV").execute(action); });
813   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
814   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
815   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherVAction("allGatherV").execute(action); });
816   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
817   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
818
819   //if we have a delayed start, sleep here.
820   if(*argc>2){
821     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
822     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
823     smpi_execute_flops(value);
824   } else {
825     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
826     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
827     smpi_execute_flops(0.0);
828   }
829 }
830
831 /** @brief actually run the replay after initialization */
832 void smpi_replay_main(int* argc, char*** argv)
833 {
834   simgrid::xbt::replay_runner(*argc, *argv);
835
836   /* and now, finalize everything */
837   /* One active process will stop. Decrease the counter*/
838   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
839   if (not get_reqq_self()->empty()) {
840     unsigned int count_requests=get_reqq_self()->size();
841     MPI_Request requests[count_requests];
842     MPI_Status status[count_requests];
843     unsigned int i=0;
844
845     for (auto const& req : *get_reqq_self()) {
846       requests[i] = req;
847       i++;
848     }
849     simgrid::smpi::Request::waitall(count_requests, requests, status);
850   }
851   delete get_reqq_self();
852   active_processes--;
853
854   if(active_processes==0){
855     /* Last process alive speaking: end the simulated timer */
856     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
857     smpi_free_replay_tmp_buffers();
858   }
859
860   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
861
862   smpi_process()->finalize();
863
864   TRACE_smpi_comm_out(Actor::self()->getPid());
865   TRACE_smpi_finalize(Actor::self()->getPid());
866 }
867
868 /** @brief chain a replay initialization and a replay start */
869 void smpi_replay_run(int* argc, char*** argv)
870 {
871   smpi_replay_init(argc, argv);
872   smpi_replay_main(argc, argv);
873 }