Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Make start_time a local variable
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action){};
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   /* communication partner; if we send, this is the receiver and vice versa */
78   int partner;
79   double size;
80   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81
82   void parse(simgrid::xbt::ReplayAction& action) override
83   {
84     CHECK_ACTION_PARAMS(action, 2, 1)
85     partner = std::stoi(action[2]);
86     size    = parse_double(action[3]);
87     if (action.size() > 4)
88       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
89   }
90 };
91
92 class ComputeParser : public ActionArgParser {
93 public:
94   /* communication partner; if we send, this is the receiver and vice versa */
95   double flops;
96
97   void parse(simgrid::xbt::ReplayAction& action) override
98   {
99     CHECK_ACTION_PARAMS(action, 1, 0)
100     flops = parse_double(action[2]);
101   }
102 };
103
104 template <class T> class ReplayAction {
105 protected:
106   const std::string name;
107   T args;
108
109   int my_proc_id;
110
111 public:
112   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid())
113   {
114   }
115
116   virtual void execute(simgrid::xbt::ReplayAction& action)
117   {
118     // Needs to be re-initialized for every action, hence here
119     double start_time = smpi_process()->simulated_elapsed();
120     args.parse(action);
121     kernel(action);
122     log_timed_action(action, start_time);
123   }
124
125   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
126 };
127
128 class WaitAction : public ReplayAction<ActionArgParser> {
129 public:
130   WaitAction() : ReplayAction("Wait") {}
131   void kernel(simgrid::xbt::ReplayAction& action) override
132   {
133     CHECK_ACTION_PARAMS(action, 0, 0)
134     MPI_Status status;
135
136     std::string s = boost::algorithm::join(action, " ");
137     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
138     MPI_Request request = get_reqq_self()->back();
139     get_reqq_self()->pop_back();
140
141     if (request == nullptr) {
142       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
143        * return.*/
144       return;
145     }
146
147     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
148
149     MPI_Group group          = request->comm()->group();
150     int src_traced           = group->rank(request->src());
151     int dst_traced           = group->rank(request->dst());
152     bool is_wait_for_receive = (request->flags() & RECV);
153     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
154     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
155
156     Request::wait(&request, &status);
157
158     TRACE_smpi_comm_out(rank);
159     if (is_wait_for_receive)
160       TRACE_smpi_recv(src_traced, dst_traced, 0);
161   }
162 };
163
164 class SendAction : public ReplayAction<SendRecvParser> {
165 public:
166   SendAction() = delete;
167   SendAction(std::string name) : ReplayAction(name) {}
168   void kernel(simgrid::xbt::ReplayAction& action) override
169   {
170     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
171
172     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
173                                                                                  Datatype::encode(args.datatype1)));
174     if (not TRACE_smpi_view_internals())
175       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
176
177     if (name == "send") {
178       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179     } else if (name == "Isend") {
180       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
181       get_reqq_self()->push_back(request);
182     } else {
183       xbt_die("Don't know this action, %s", name.c_str());
184     }
185
186     TRACE_smpi_comm_out(my_proc_id);
187   }
188 };
189
190 class RecvAction : public ReplayAction<SendRecvParser> {
191 public:
192   RecvAction() = delete;
193   explicit RecvAction(std::string name) : ReplayAction(name) {}
194   void kernel(simgrid::xbt::ReplayAction& action) override
195   {
196     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
197
198     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
199                                                                                  Datatype::encode(args.datatype1)));
200
201     MPI_Status status;
202     // unknown size from the receiver point of view
203     if (args.size <= 0.0) {
204       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
205       args.size = status.count;
206     }
207
208     if (name == "recv") {
209       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
210     } else if (name == "Irecv") {
211       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
212       get_reqq_self()->push_back(request);
213     }
214
215     TRACE_smpi_comm_out(my_proc_id);
216     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
217     if (name == "recv" && not TRACE_smpi_view_internals()) {
218       TRACE_smpi_recv(src_traced, my_proc_id, 0);
219     }
220   }
221 };
222
223 class ComputeAction : public ReplayAction<ComputeParser> {
224 public:
225   ComputeAction() : ReplayAction("compute") {}
226   void kernel(simgrid::xbt::ReplayAction& action) override
227   {
228     TRACE_smpi_computing_in(my_proc_id, args.flops);
229     smpi_execute_flops(args.flops);
230     TRACE_smpi_computing_out(my_proc_id);
231   }
232 };
233
234 } // Replay Namespace
235
236 static void action_init(simgrid::xbt::ReplayAction& action)
237 {
238   XBT_DEBUG("Initialize the counters");
239   CHECK_ACTION_PARAMS(action, 0, 1)
240   if (action.size() > 2)
241     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
242   else
243     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
244
245   /* start a simulated timer */
246   smpi_process()->simulated_start();
247   /*initialize the number of active processes */
248   active_processes = smpi_process_count();
249
250   set_reqq_self(new std::vector<MPI_Request>);
251 }
252
253 static void action_finalize(simgrid::xbt::ReplayAction& action)
254 {
255   /* Nothing to do */
256 }
257
258 static void action_comm_size(simgrid::xbt::ReplayAction& action)
259 {
260   communicator_size = parse_double(action[2]);
261   log_timed_action (action, smpi_process()->simulated_elapsed());
262 }
263
264 static void action_comm_split(simgrid::xbt::ReplayAction& action)
265 {
266   log_timed_action (action, smpi_process()->simulated_elapsed());
267 }
268
269 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
270 {
271   log_timed_action (action, smpi_process()->simulated_elapsed());
272 }
273
274 static void action_compute(simgrid::xbt::ReplayAction& action)
275 {
276   Replay::ComputeAction().execute(action);
277 }
278
279 static void action_test(simgrid::xbt::ReplayAction& action)
280 {
281   CHECK_ACTION_PARAMS(action, 0, 0)
282   double clock = smpi_process()->simulated_elapsed();
283   MPI_Status status;
284
285   MPI_Request request = get_reqq_self()->back();
286   get_reqq_self()->pop_back();
287   //if request is null here, this may mean that a previous test has succeeded
288   //Different times in traced application and replayed version may lead to this
289   //In this case, ignore the extra calls.
290   if(request!=nullptr){
291     int my_proc_id = Actor::self()->getPid();
292     TRACE_smpi_testing_in(my_proc_id);
293
294     int flag = Request::test(&request, &status);
295
296     XBT_DEBUG("MPI_Test result: %d", flag);
297     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
298     get_reqq_self()->push_back(request);
299
300     TRACE_smpi_testing_out(my_proc_id);
301   }
302   log_timed_action (action, clock);
303 }
304
305 static void action_waitall(simgrid::xbt::ReplayAction& action)
306 {
307   CHECK_ACTION_PARAMS(action, 0, 0)
308   double clock = smpi_process()->simulated_elapsed();
309   const unsigned int count_requests = get_reqq_self()->size();
310
311   if (count_requests>0) {
312     MPI_Status status[count_requests];
313
314     int my_proc_id_traced = Actor::self()->getPid();
315     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
316                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
317     int recvs_snd[count_requests];
318     int recvs_rcv[count_requests];
319     for (unsigned int i = 0; i < count_requests; i++) {
320       const auto& req = (*get_reqq_self())[i];
321       if (req && (req->flags() & RECV)) {
322         recvs_snd[i] = req->src();
323         recvs_rcv[i] = req->dst();
324       } else
325         recvs_snd[i] = -100;
326    }
327    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
328
329    for (unsigned i = 0; i < count_requests; i++) {
330      if (recvs_snd[i]!=-100)
331        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
332    }
333    TRACE_smpi_comm_out(my_proc_id_traced);
334   }
335   log_timed_action (action, clock);
336 }
337
338 static void action_barrier(simgrid::xbt::ReplayAction& action)
339 {
340   double clock = smpi_process()->simulated_elapsed();
341   int my_proc_id = Actor::self()->getPid();
342   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
343
344   Colls::barrier(MPI_COMM_WORLD);
345
346   TRACE_smpi_comm_out(my_proc_id);
347   log_timed_action (action, clock);
348 }
349
350 static void action_bcast(simgrid::xbt::ReplayAction& action)
351 {
352   CHECK_ACTION_PARAMS(action, 1, 2)
353   double size = parse_double(action[2]);
354   double clock = smpi_process()->simulated_elapsed();
355   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
356   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
358
359   int my_proc_id = Actor::self()->getPid();
360   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
363
364   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
365
366   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
367
368   TRACE_smpi_comm_out(my_proc_id);
369   log_timed_action (action, clock);
370 }
371
372 static void action_reduce(simgrid::xbt::ReplayAction& action)
373 {
374   CHECK_ACTION_PARAMS(action, 2, 2)
375   double comm_size = parse_double(action[2]);
376   double comp_size = parse_double(action[3]);
377   double clock = smpi_process()->simulated_elapsed();
378   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
379
380   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
381
382   int my_proc_id = Actor::self()->getPid();
383   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
386
387   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390   smpi_execute_flops(comp_size);
391
392   TRACE_smpi_comm_out(my_proc_id);
393   log_timed_action (action, clock);
394 }
395
396 static void action_allReduce(simgrid::xbt::ReplayAction& action)
397 {
398   CHECK_ACTION_PARAMS(action, 2, 1)
399   double comm_size = parse_double(action[2]);
400   double comp_size = parse_double(action[3]);
401
402   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
403
404   double clock = smpi_process()->simulated_elapsed();
405   int my_proc_id = Actor::self()->getPid();
406   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
408
409   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412   smpi_execute_flops(comp_size);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_allToAll(simgrid::xbt::ReplayAction& action)
419 {
420   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
421   double clock = smpi_process()->simulated_elapsed();
422   unsigned long comm_size = MPI_COMM_WORLD->size();
423   int send_size = parse_double(action[2]);
424   int recv_size = parse_double(action[3]);
425   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
426   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
427
428   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
429   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
430
431   int my_proc_id = Actor::self()->getPid();
432   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
433                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
434                                                     Datatype::encode(MPI_CURRENT_TYPE),
435                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
436
437   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
438
439   TRACE_smpi_comm_out(my_proc_id);
440   log_timed_action (action, clock);
441 }
442
443 static void action_gather(simgrid::xbt::ReplayAction& action)
444 {
445   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
446         0 gather 68 68 0 0 0
447       where:
448         1) 68 is the sendcounts
449         2) 68 is the recvcounts
450         3) 0 is the root node
451         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
452         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
453   */
454   CHECK_ACTION_PARAMS(action, 2, 3)
455   double clock = smpi_process()->simulated_elapsed();
456   unsigned long comm_size = MPI_COMM_WORLD->size();
457   int send_size = parse_double(action[2]);
458   int recv_size = parse_double(action[3]);
459   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
460   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
461
462   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
463   void *recv = nullptr;
464   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
465   int rank = MPI_COMM_WORLD->rank();
466
467   if(rank==root)
468     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
469
470   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
471                                                                         Datatype::encode(MPI_CURRENT_TYPE),
472                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
473
474   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
475
476   TRACE_smpi_comm_out(Actor::self()->getPid());
477   log_timed_action (action, clock);
478 }
479
480 static void action_scatter(simgrid::xbt::ReplayAction& action)
481 {
482   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
483         0 gather 68 68 0 0 0
484       where:
485         1) 68 is the sendcounts
486         2) 68 is the recvcounts
487         3) 0 is the root node
488         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
489         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
490   */
491   CHECK_ACTION_PARAMS(action, 2, 3)
492   double clock                   = smpi_process()->simulated_elapsed();
493   unsigned long comm_size        = MPI_COMM_WORLD->size();
494   int send_size                  = parse_double(action[2]);
495   int recv_size                  = parse_double(action[3]);
496   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
497   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
498
499   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
500   void* recv = nullptr;
501   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
502   int rank = MPI_COMM_WORLD->rank();
503
504   if (rank == root)
505     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
506
507   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
508                                                                         Datatype::encode(MPI_CURRENT_TYPE),
509                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
510
511   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
512
513   TRACE_smpi_comm_out(Actor::self()->getPid());
514   log_timed_action(action, clock);
515 }
516
517 static void action_gatherv(simgrid::xbt::ReplayAction& action)
518 {
519   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
520        0 gather 68 68 10 10 10 0 0 0
521      where:
522        1) 68 is the sendcount
523        2) 68 10 10 10 is the recvcounts
524        3) 0 is the root node
525        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
526        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
527   */
528   double clock = smpi_process()->simulated_elapsed();
529   unsigned long comm_size = MPI_COMM_WORLD->size();
530   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
531   int send_size = parse_double(action[2]);
532   std::vector<int> disps(comm_size, 0);
533   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
534
535   MPI_Datatype MPI_CURRENT_TYPE =
536       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
537   MPI_Datatype MPI_CURRENT_TYPE2{
538       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
539
540   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
541   void *recv = nullptr;
542   for (unsigned int i = 0; i < comm_size; i++) {
543     (*recvcounts)[i] = std::stoi(action[i + 3]);
544   }
545   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
546
547   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
548   int rank = MPI_COMM_WORLD->rank();
549
550   if(rank==root)
551     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
552
553   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
554                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
555                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
556
557   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
558                  MPI_COMM_WORLD);
559
560   TRACE_smpi_comm_out(Actor::self()->getPid());
561   log_timed_action (action, clock);
562 }
563
564 static void action_scatterv(simgrid::xbt::ReplayAction& action)
565 {
566   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
567        0 gather 68 10 10 10 68 0 0 0
568      where:
569        1) 68 10 10 10 is the sendcounts
570        2) 68 is the recvcount
571        3) 0 is the root node
572        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
573        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
574   */
575   double clock  = smpi_process()->simulated_elapsed();
576   unsigned long comm_size = MPI_COMM_WORLD->size();
577   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
578   int recv_size = parse_double(action[2 + comm_size]);
579   std::vector<int> disps(comm_size, 0);
580   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
581
582   MPI_Datatype MPI_CURRENT_TYPE =
583       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
584   MPI_Datatype MPI_CURRENT_TYPE2{
585       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
586
587   void* send = nullptr;
588   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
589   for (unsigned int i = 0; i < comm_size; i++) {
590     (*sendcounts)[i] = std::stoi(action[i + 2]);
591   }
592   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
593
594   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
595   int rank = MPI_COMM_WORLD->rank();
596
597   if (rank == root)
598     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
599
600   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
601                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
602                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
603
604   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
605                   MPI_COMM_WORLD);
606
607   TRACE_smpi_comm_out(Actor::self()->getPid());
608   log_timed_action(action, clock);
609 }
610
611 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
612 {
613   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
614        0 reduceScatter 275427 275427 275427 204020 11346849 0
615      where:
616        1) The first four values after the name of the action declare the recvcounts array
617        2) The value 11346849 is the amount of instructions
618        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
619  */
620   double clock = smpi_process()->simulated_elapsed();
621   unsigned long comm_size = MPI_COMM_WORLD->size();
622   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
623   int comp_size = parse_double(action[2+comm_size]);
624   int my_proc_id                     = Actor::self()->getPid();
625   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
626   MPI_Datatype MPI_CURRENT_TYPE =
627       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
628
629   for (unsigned int i = 0; i < comm_size; i++) {
630     recvcounts->push_back(std::stoi(action[i + 2]));
631   }
632   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
633
634   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
635                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
636                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
637                                                        Datatype::encode(MPI_CURRENT_TYPE)));
638
639   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
640   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
641
642   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
643   smpi_execute_flops(comp_size);
644
645   TRACE_smpi_comm_out(my_proc_id);
646   log_timed_action (action, clock);
647 }
648
649 static void action_allgather(simgrid::xbt::ReplayAction& action)
650 {
651   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
652         0 allGather 275427 275427
653     where:
654         1) 275427 is the sendcount
655         2) 275427 is the recvcount
656         3) No more values mean that the datatype for sent and receive buffer is the default one, see
657     simgrid::smpi::Datatype::decode().
658   */
659   double clock = smpi_process()->simulated_elapsed();
660
661   CHECK_ACTION_PARAMS(action, 2, 2)
662   int sendcount = std::stoi(action[2]);
663   int recvcount = std::stoi(action[3]);
664
665   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
666   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
667
668   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
669   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
670
671   int my_proc_id = Actor::self()->getPid();
672
673   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
674                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
675                                                     Datatype::encode(MPI_CURRENT_TYPE),
676                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
677
678   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
679
680   TRACE_smpi_comm_out(my_proc_id);
681   log_timed_action (action, clock);
682 }
683
684 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
685 {
686   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
687         0 allGatherV 275427 275427 275427 275427 204020
688      where:
689         1) 275427 is the sendcount
690         2) The next four elements declare the recvcounts array
691         3) No more values mean that the datatype for sent and receive buffer is the default one, see
692      simgrid::smpi::Datatype::decode().
693   */
694   double clock = smpi_process()->simulated_elapsed();
695
696   unsigned long comm_size = MPI_COMM_WORLD->size();
697   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
698   int sendcount = std::stoi(action[2]);
699   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
700   std::vector<int> disps(comm_size, 0);
701
702   int datatype_index = 0, disp_index = 0;
703   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
704     datatype_index = 3 + comm_size;
705     disp_index     = datatype_index + 1;
706   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
707     datatype_index = -1;
708     disp_index     = 3 + comm_size;
709   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
710     datatype_index = 3 + comm_size;
711   }
712
713   if (disp_index != 0) {
714     for (unsigned int i = 0; i < comm_size; i++)
715       disps[i]          = std::stoi(action[disp_index + i]);
716   }
717
718   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
719                                                      : MPI_DEFAULT_TYPE};
720   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
721                                                       : MPI_DEFAULT_TYPE};
722
723   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
724
725   for (unsigned int i = 0; i < comm_size; i++) {
726     (*recvcounts)[i] = std::stoi(action[i + 3]);
727   }
728   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
729   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
730
731   int my_proc_id = Actor::self()->getPid();
732
733   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
734                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
735                                                        Datatype::encode(MPI_CURRENT_TYPE),
736                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
737
738   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
739                     MPI_COMM_WORLD);
740
741   TRACE_smpi_comm_out(my_proc_id);
742   log_timed_action (action, clock);
743 }
744
745 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
746 {
747   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
748         0 allToAllV 100 1 7 10 12 100 1 70 10 5
749      where:
750         1) 100 is the size of the send buffer *sizeof(int),
751         2) 1 7 10 12 is the sendcounts array
752         3) 100*sizeof(int) is the size of the receiver buffer
753         4)  1 70 10 5 is the recvcounts array
754   */
755   double clock = smpi_process()->simulated_elapsed();
756
757   unsigned long comm_size = MPI_COMM_WORLD->size();
758   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
759   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
760   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
761   std::vector<int> senddisps(comm_size, 0);
762   std::vector<int> recvdisps(comm_size, 0);
763
764   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
765                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
766                                       : MPI_DEFAULT_TYPE;
767   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
768                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
769                                      : MPI_DEFAULT_TYPE};
770
771   int send_buf_size=parse_double(action[2]);
772   int recv_buf_size=parse_double(action[3+comm_size]);
773   int my_proc_id = Actor::self()->getPid();
774   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
775   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
776
777   for (unsigned int i = 0; i < comm_size; i++) {
778     (*sendcounts)[i] = std::stoi(action[3 + i]);
779     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
780   }
781   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
782   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
783
784   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
785                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
786                                                        Datatype::encode(MPI_CURRENT_TYPE),
787                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
788
789   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
790                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
791
792   TRACE_smpi_comm_out(my_proc_id);
793   log_timed_action (action, clock);
794 }
795
796 }} // namespace simgrid::smpi
797
798 /** @brief Only initialize the replay, don't do it for real */
799 void smpi_replay_init(int* argc, char*** argv)
800 {
801   simgrid::smpi::Process::init(argc, argv);
802   smpi_process()->mark_as_initialized();
803   smpi_process()->set_replaying(true);
804
805   int my_proc_id = Actor::self()->getPid();
806   TRACE_smpi_init(my_proc_id);
807   TRACE_smpi_computing_init(my_proc_id);
808   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
809   TRACE_smpi_comm_out(my_proc_id);
810   xbt_replay_action_register("init",       simgrid::smpi::action_init);
811   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
812   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
813   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
814   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
815
816   std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
817   std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
818   std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
819   std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
820   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
821
822   xbt_replay_action_register("send",
823                              std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
824   xbt_replay_action_register("Isend",
825                              std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
826   xbt_replay_action_register("recv",
827                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
828   xbt_replay_action_register("Irecv",
829                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
830   xbt_replay_action_register("test", simgrid::smpi::action_test);
831   xbt_replay_action_register("wait",
832                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
833   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
834   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
835   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
836   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
837   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
838   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
839   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
840   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
841   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
842   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
843   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
844   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
845   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
846   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
847   xbt_replay_action_register("compute", simgrid::smpi::action_compute);
848
849   //if we have a delayed start, sleep here.
850   if(*argc>2){
851     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
852     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
853     smpi_execute_flops(value);
854   } else {
855     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
856     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
857     smpi_execute_flops(0.0);
858   }
859 }
860
861 /** @brief actually run the replay after initialization */
862 void smpi_replay_main(int* argc, char*** argv)
863 {
864   simgrid::xbt::replay_runner(*argc, *argv);
865
866   /* and now, finalize everything */
867   /* One active process will stop. Decrease the counter*/
868   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
869   if (not get_reqq_self()->empty()) {
870     unsigned int count_requests=get_reqq_self()->size();
871     MPI_Request requests[count_requests];
872     MPI_Status status[count_requests];
873     unsigned int i=0;
874
875     for (auto const& req : *get_reqq_self()) {
876       requests[i] = req;
877       i++;
878     }
879     simgrid::smpi::Request::waitall(count_requests, requests, status);
880   }
881   delete get_reqq_self();
882   active_processes--;
883
884   if(active_processes==0){
885     /* Last process alive speaking: end the simulated timer */
886     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
887     smpi_free_replay_tmp_buffers();
888   }
889
890   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
891
892   smpi_process()->finalize();
893
894   TRACE_smpi_comm_out(Actor::self()->getPid());
895   TRACE_smpi_finalize(Actor::self()->getPid());
896 }
897
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
900 {
901   smpi_replay_init(argc, argv);
902   smpi_replay_main(argc, argv);
903 }