Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
15fdadaee9cac8c8bcd22c93271cacdcbdc6ab75
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action){};
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   /* communication partner; if we send, this is the receiver and vice versa */
78   int partner;
79   double size;
80   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81
82   void parse(simgrid::xbt::ReplayAction& action) override
83   {
84     CHECK_ACTION_PARAMS(action, 2, 1)
85     partner = std::stoi(action[2]);
86     size    = parse_double(action[3]);
87     if (action.size() > 4)
88       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
89   }
90 };
91
92 class ComputeParser : public ActionArgParser {
93 public:
94   /* communication partner; if we send, this is the receiver and vice versa */
95   double flops;
96
97   void parse(simgrid::xbt::ReplayAction& action) override
98   {
99     CHECK_ACTION_PARAMS(action, 1, 0)
100     flops = parse_double(action[2]);
101   }
102 };
103
104 template <class T> class ReplayAction {
105 protected:
106   const std::string name;
107   T args;
108
109   /*
110    * Used to compute the duration of this action.
111    */
112   double start_time;
113
114   int my_proc_id;
115
116 public:
117   explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
118   {
119   }
120
121   virtual void execute(simgrid::xbt::ReplayAction& action)
122   {
123     // Needs to be re-initialized for every action, hence here
124     start_time = smpi_process()->simulated_elapsed();
125     args.parse(action);
126     kernel(action);
127     log_timed_action(action, start_time);
128   }
129
130   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
131 };
132
133 class WaitAction : public ReplayAction<ActionArgParser> {
134 public:
135   WaitAction() : ReplayAction("Wait") {}
136   void kernel(simgrid::xbt::ReplayAction& action) override
137   {
138     CHECK_ACTION_PARAMS(action, 0, 0)
139     MPI_Status status;
140
141     std::string s = boost::algorithm::join(action, " ");
142     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
143     MPI_Request request = get_reqq_self()->back();
144     get_reqq_self()->pop_back();
145
146     if (request == nullptr) {
147       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
148        * return.*/
149       return;
150     }
151
152     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
153
154     MPI_Group group          = request->comm()->group();
155     int src_traced           = group->rank(request->src());
156     int dst_traced           = group->rank(request->dst());
157     bool is_wait_for_receive = (request->flags() & RECV);
158     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
159     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
160
161     Request::wait(&request, &status);
162
163     TRACE_smpi_comm_out(rank);
164     if (is_wait_for_receive)
165       TRACE_smpi_recv(src_traced, dst_traced, 0);
166   }
167 };
168
169 class SendAction : public ReplayAction<SendRecvParser> {
170 public:
171   SendAction() = delete;
172   SendAction(std::string name) : ReplayAction(name) {}
173   void kernel(simgrid::xbt::ReplayAction& action) override
174   {
175     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
176
177     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
178                                                                                  Datatype::encode(args.datatype1)));
179     if (not TRACE_smpi_view_internals())
180       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
181
182     if (name == "send") {
183       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
184     } else if (name == "Isend") {
185       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
186       get_reqq_self()->push_back(request);
187     } else {
188       xbt_die("Don't know this action, %s", name.c_str());
189     }
190
191     TRACE_smpi_comm_out(my_proc_id);
192   }
193 };
194
195 class RecvAction : public ReplayAction<SendRecvParser> {
196 public:
197   RecvAction() = delete;
198   explicit RecvAction(std::string name) : ReplayAction(name) {}
199   void kernel(simgrid::xbt::ReplayAction& action) override
200   {
201     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
202
203     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
204                                                                                  Datatype::encode(args.datatype1)));
205
206     MPI_Status status;
207     // unknown size from the receiver point of view
208     if (args.size <= 0.0) {
209       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
210       args.size = status.count;
211     }
212
213     if (name == "recv") {
214       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
215     } else if (name == "Irecv") {
216       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
217       get_reqq_self()->push_back(request);
218     }
219
220     TRACE_smpi_comm_out(my_proc_id);
221     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
222     if (name == "recv" && not TRACE_smpi_view_internals()) {
223       TRACE_smpi_recv(src_traced, my_proc_id, 0);
224     }
225   }
226 };
227
228 class ComputeAction : public ReplayAction<ComputeParser> {
229 public:
230   ComputeAction() : ReplayAction("compute") {}
231   void kernel(simgrid::xbt::ReplayAction& action) override
232   {
233     TRACE_smpi_computing_in(my_proc_id, args.flops);
234     smpi_execute_flops(args.flops);
235     TRACE_smpi_computing_out(my_proc_id);
236   }
237 };
238
239 } // Replay Namespace
240
241 static void action_init(simgrid::xbt::ReplayAction& action)
242 {
243   XBT_DEBUG("Initialize the counters");
244   CHECK_ACTION_PARAMS(action, 0, 1)
245   if (action.size() > 2)
246     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
247   else
248     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
249
250   /* start a simulated timer */
251   smpi_process()->simulated_start();
252   /*initialize the number of active processes */
253   active_processes = smpi_process_count();
254
255   set_reqq_self(new std::vector<MPI_Request>);
256 }
257
258 static void action_finalize(simgrid::xbt::ReplayAction& action)
259 {
260   /* Nothing to do */
261 }
262
263 static void action_comm_size(simgrid::xbt::ReplayAction& action)
264 {
265   communicator_size = parse_double(action[2]);
266   log_timed_action (action, smpi_process()->simulated_elapsed());
267 }
268
269 static void action_comm_split(simgrid::xbt::ReplayAction& action)
270 {
271   log_timed_action (action, smpi_process()->simulated_elapsed());
272 }
273
274 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
275 {
276   log_timed_action (action, smpi_process()->simulated_elapsed());
277 }
278
279 static void action_compute(simgrid::xbt::ReplayAction& action)
280 {
281   Replay::ComputeAction().execute(action);
282 }
283
284 static void action_test(simgrid::xbt::ReplayAction& action)
285 {
286   CHECK_ACTION_PARAMS(action, 0, 0)
287   double clock = smpi_process()->simulated_elapsed();
288   MPI_Status status;
289
290   MPI_Request request = get_reqq_self()->back();
291   get_reqq_self()->pop_back();
292   //if request is null here, this may mean that a previous test has succeeded
293   //Different times in traced application and replayed version may lead to this
294   //In this case, ignore the extra calls.
295   if(request!=nullptr){
296     int my_proc_id = Actor::self()->getPid();
297     TRACE_smpi_testing_in(my_proc_id);
298
299     int flag = Request::test(&request, &status);
300
301     XBT_DEBUG("MPI_Test result: %d", flag);
302     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
303     get_reqq_self()->push_back(request);
304
305     TRACE_smpi_testing_out(my_proc_id);
306   }
307   log_timed_action (action, clock);
308 }
309
310 static void action_waitall(simgrid::xbt::ReplayAction& action)
311 {
312   CHECK_ACTION_PARAMS(action, 0, 0)
313   double clock = smpi_process()->simulated_elapsed();
314   const unsigned int count_requests = get_reqq_self()->size();
315
316   if (count_requests>0) {
317     MPI_Status status[count_requests];
318
319     int my_proc_id_traced = Actor::self()->getPid();
320     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
321                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
322     int recvs_snd[count_requests];
323     int recvs_rcv[count_requests];
324     for (unsigned int i = 0; i < count_requests; i++) {
325       const auto& req = (*get_reqq_self())[i];
326       if (req && (req->flags() & RECV)) {
327         recvs_snd[i] = req->src();
328         recvs_rcv[i] = req->dst();
329       } else
330         recvs_snd[i] = -100;
331    }
332    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
333
334    for (unsigned i = 0; i < count_requests; i++) {
335      if (recvs_snd[i]!=-100)
336        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
337    }
338    TRACE_smpi_comm_out(my_proc_id_traced);
339   }
340   log_timed_action (action, clock);
341 }
342
343 static void action_barrier(simgrid::xbt::ReplayAction& action)
344 {
345   double clock = smpi_process()->simulated_elapsed();
346   int my_proc_id = Actor::self()->getPid();
347   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
348
349   Colls::barrier(MPI_COMM_WORLD);
350
351   TRACE_smpi_comm_out(my_proc_id);
352   log_timed_action (action, clock);
353 }
354
355 static void action_bcast(simgrid::xbt::ReplayAction& action)
356 {
357   CHECK_ACTION_PARAMS(action, 1, 2)
358   double size = parse_double(action[2]);
359   double clock = smpi_process()->simulated_elapsed();
360   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
361   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
362   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
363
364   int my_proc_id = Actor::self()->getPid();
365   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
366                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
367                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
368
369   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
370
371   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
372
373   TRACE_smpi_comm_out(my_proc_id);
374   log_timed_action (action, clock);
375 }
376
377 static void action_reduce(simgrid::xbt::ReplayAction& action)
378 {
379   CHECK_ACTION_PARAMS(action, 2, 2)
380   double comm_size = parse_double(action[2]);
381   double comp_size = parse_double(action[3]);
382   double clock = smpi_process()->simulated_elapsed();
383   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
384
385   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
386
387   int my_proc_id = Actor::self()->getPid();
388   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
389                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
390                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
391
392   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
393   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
394   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
395   smpi_execute_flops(comp_size);
396
397   TRACE_smpi_comm_out(my_proc_id);
398   log_timed_action (action, clock);
399 }
400
401 static void action_allReduce(simgrid::xbt::ReplayAction& action)
402 {
403   CHECK_ACTION_PARAMS(action, 2, 1)
404   double comm_size = parse_double(action[2]);
405   double comp_size = parse_double(action[3]);
406
407   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
408
409   double clock = smpi_process()->simulated_elapsed();
410   int my_proc_id = Actor::self()->getPid();
411   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
412                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
413
414   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
415   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
416   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
417   smpi_execute_flops(comp_size);
418
419   TRACE_smpi_comm_out(my_proc_id);
420   log_timed_action (action, clock);
421 }
422
423 static void action_allToAll(simgrid::xbt::ReplayAction& action)
424 {
425   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
426   double clock = smpi_process()->simulated_elapsed();
427   unsigned long comm_size = MPI_COMM_WORLD->size();
428   int send_size = parse_double(action[2]);
429   int recv_size = parse_double(action[3]);
430   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
431   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
432
433   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
434   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
435
436   int my_proc_id = Actor::self()->getPid();
437   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
438                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
439                                                     Datatype::encode(MPI_CURRENT_TYPE),
440                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
441
442   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
443
444   TRACE_smpi_comm_out(my_proc_id);
445   log_timed_action (action, clock);
446 }
447
448 static void action_gather(simgrid::xbt::ReplayAction& action)
449 {
450   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
451         0 gather 68 68 0 0 0
452       where:
453         1) 68 is the sendcounts
454         2) 68 is the recvcounts
455         3) 0 is the root node
456         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
457         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
458   */
459   CHECK_ACTION_PARAMS(action, 2, 3)
460   double clock = smpi_process()->simulated_elapsed();
461   unsigned long comm_size = MPI_COMM_WORLD->size();
462   int send_size = parse_double(action[2]);
463   int recv_size = parse_double(action[3]);
464   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
465   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
466
467   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
468   void *recv = nullptr;
469   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
470   int rank = MPI_COMM_WORLD->rank();
471
472   if(rank==root)
473     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
474
475   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
476                                                                         Datatype::encode(MPI_CURRENT_TYPE),
477                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
478
479   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
480
481   TRACE_smpi_comm_out(Actor::self()->getPid());
482   log_timed_action (action, clock);
483 }
484
485 static void action_scatter(simgrid::xbt::ReplayAction& action)
486 {
487   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
488         0 gather 68 68 0 0 0
489       where:
490         1) 68 is the sendcounts
491         2) 68 is the recvcounts
492         3) 0 is the root node
493         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
494         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
495   */
496   CHECK_ACTION_PARAMS(action, 2, 3)
497   double clock                   = smpi_process()->simulated_elapsed();
498   unsigned long comm_size        = MPI_COMM_WORLD->size();
499   int send_size                  = parse_double(action[2]);
500   int recv_size                  = parse_double(action[3]);
501   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
502   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
503
504   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
505   void* recv = nullptr;
506   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
507   int rank = MPI_COMM_WORLD->rank();
508
509   if (rank == root)
510     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
511
512   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
513                                                                         Datatype::encode(MPI_CURRENT_TYPE),
514                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
515
516   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
517
518   TRACE_smpi_comm_out(Actor::self()->getPid());
519   log_timed_action(action, clock);
520 }
521
522 static void action_gatherv(simgrid::xbt::ReplayAction& action)
523 {
524   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
525        0 gather 68 68 10 10 10 0 0 0
526      where:
527        1) 68 is the sendcount
528        2) 68 10 10 10 is the recvcounts
529        3) 0 is the root node
530        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
531        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
532   */
533   double clock = smpi_process()->simulated_elapsed();
534   unsigned long comm_size = MPI_COMM_WORLD->size();
535   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
536   int send_size = parse_double(action[2]);
537   std::vector<int> disps(comm_size, 0);
538   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
539
540   MPI_Datatype MPI_CURRENT_TYPE =
541       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
542   MPI_Datatype MPI_CURRENT_TYPE2{
543       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
544
545   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
546   void *recv = nullptr;
547   for (unsigned int i = 0; i < comm_size; i++) {
548     (*recvcounts)[i] = std::stoi(action[i + 3]);
549   }
550   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
551
552   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
553   int rank = MPI_COMM_WORLD->rank();
554
555   if(rank==root)
556     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
557
558   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
559                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
560                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
561
562   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
563                  MPI_COMM_WORLD);
564
565   TRACE_smpi_comm_out(Actor::self()->getPid());
566   log_timed_action (action, clock);
567 }
568
569 static void action_scatterv(simgrid::xbt::ReplayAction& action)
570 {
571   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
572        0 gather 68 10 10 10 68 0 0 0
573      where:
574        1) 68 10 10 10 is the sendcounts
575        2) 68 is the recvcount
576        3) 0 is the root node
577        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
578        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
579   */
580   double clock  = smpi_process()->simulated_elapsed();
581   unsigned long comm_size = MPI_COMM_WORLD->size();
582   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
583   int recv_size = parse_double(action[2 + comm_size]);
584   std::vector<int> disps(comm_size, 0);
585   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
586
587   MPI_Datatype MPI_CURRENT_TYPE =
588       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
589   MPI_Datatype MPI_CURRENT_TYPE2{
590       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
591
592   void* send = nullptr;
593   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
594   for (unsigned int i = 0; i < comm_size; i++) {
595     (*sendcounts)[i] = std::stoi(action[i + 2]);
596   }
597   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
598
599   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
600   int rank = MPI_COMM_WORLD->rank();
601
602   if (rank == root)
603     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
604
605   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
606                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
607                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
608
609   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
610                   MPI_COMM_WORLD);
611
612   TRACE_smpi_comm_out(Actor::self()->getPid());
613   log_timed_action(action, clock);
614 }
615
616 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
617 {
618   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
619        0 reduceScatter 275427 275427 275427 204020 11346849 0
620      where:
621        1) The first four values after the name of the action declare the recvcounts array
622        2) The value 11346849 is the amount of instructions
623        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
624  */
625   double clock = smpi_process()->simulated_elapsed();
626   unsigned long comm_size = MPI_COMM_WORLD->size();
627   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
628   int comp_size = parse_double(action[2+comm_size]);
629   int my_proc_id                     = Actor::self()->getPid();
630   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
631   MPI_Datatype MPI_CURRENT_TYPE =
632       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
633
634   for (unsigned int i = 0; i < comm_size; i++) {
635     recvcounts->push_back(std::stoi(action[i + 2]));
636   }
637   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
638
639   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
640                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
641                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
642                                                        Datatype::encode(MPI_CURRENT_TYPE)));
643
644   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
645   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
646
647   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
648   smpi_execute_flops(comp_size);
649
650   TRACE_smpi_comm_out(my_proc_id);
651   log_timed_action (action, clock);
652 }
653
654 static void action_allgather(simgrid::xbt::ReplayAction& action)
655 {
656   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
657         0 allGather 275427 275427
658     where:
659         1) 275427 is the sendcount
660         2) 275427 is the recvcount
661         3) No more values mean that the datatype for sent and receive buffer is the default one, see
662     simgrid::smpi::Datatype::decode().
663   */
664   double clock = smpi_process()->simulated_elapsed();
665
666   CHECK_ACTION_PARAMS(action, 2, 2)
667   int sendcount = std::stoi(action[2]);
668   int recvcount = std::stoi(action[3]);
669
670   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
671   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
672
673   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
674   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
675
676   int my_proc_id = Actor::self()->getPid();
677
678   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
679                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
680                                                     Datatype::encode(MPI_CURRENT_TYPE),
681                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
682
683   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
684
685   TRACE_smpi_comm_out(my_proc_id);
686   log_timed_action (action, clock);
687 }
688
689 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
690 {
691   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
692         0 allGatherV 275427 275427 275427 275427 204020
693      where:
694         1) 275427 is the sendcount
695         2) The next four elements declare the recvcounts array
696         3) No more values mean that the datatype for sent and receive buffer is the default one, see
697      simgrid::smpi::Datatype::decode().
698   */
699   double clock = smpi_process()->simulated_elapsed();
700
701   unsigned long comm_size = MPI_COMM_WORLD->size();
702   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
703   int sendcount = std::stoi(action[2]);
704   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
705   std::vector<int> disps(comm_size, 0);
706
707   int datatype_index = 0, disp_index = 0;
708   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
709     datatype_index = 3 + comm_size;
710     disp_index     = datatype_index + 1;
711   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
712     datatype_index = -1;
713     disp_index     = 3 + comm_size;
714   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
715     datatype_index = 3 + comm_size;
716   }
717
718   if (disp_index != 0) {
719     for (unsigned int i = 0; i < comm_size; i++)
720       disps[i]          = std::stoi(action[disp_index + i]);
721   }
722
723   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
724                                                      : MPI_DEFAULT_TYPE};
725   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
726                                                       : MPI_DEFAULT_TYPE};
727
728   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
729
730   for (unsigned int i = 0; i < comm_size; i++) {
731     (*recvcounts)[i] = std::stoi(action[i + 3]);
732   }
733   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
734   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
735
736   int my_proc_id = Actor::self()->getPid();
737
738   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
740                                                        Datatype::encode(MPI_CURRENT_TYPE),
741                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
742
743   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
744                     MPI_COMM_WORLD);
745
746   TRACE_smpi_comm_out(my_proc_id);
747   log_timed_action (action, clock);
748 }
749
750 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
751 {
752   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
753         0 allToAllV 100 1 7 10 12 100 1 70 10 5
754      where:
755         1) 100 is the size of the send buffer *sizeof(int),
756         2) 1 7 10 12 is the sendcounts array
757         3) 100*sizeof(int) is the size of the receiver buffer
758         4)  1 70 10 5 is the recvcounts array
759   */
760   double clock = smpi_process()->simulated_elapsed();
761
762   unsigned long comm_size = MPI_COMM_WORLD->size();
763   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
764   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
765   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
766   std::vector<int> senddisps(comm_size, 0);
767   std::vector<int> recvdisps(comm_size, 0);
768
769   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
770                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
771                                       : MPI_DEFAULT_TYPE;
772   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
773                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
774                                      : MPI_DEFAULT_TYPE};
775
776   int send_buf_size=parse_double(action[2]);
777   int recv_buf_size=parse_double(action[3+comm_size]);
778   int my_proc_id = Actor::self()->getPid();
779   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
780   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
781
782   for (unsigned int i = 0; i < comm_size; i++) {
783     (*sendcounts)[i] = std::stoi(action[3 + i]);
784     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
785   }
786   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
787   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
788
789   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
790                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
791                                                        Datatype::encode(MPI_CURRENT_TYPE),
792                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
793
794   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
795                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
796
797   TRACE_smpi_comm_out(my_proc_id);
798   log_timed_action (action, clock);
799 }
800
801 }} // namespace simgrid::smpi
802
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
805 {
806   simgrid::smpi::Process::init(argc, argv);
807   smpi_process()->mark_as_initialized();
808   smpi_process()->set_replaying(true);
809
810   int my_proc_id = Actor::self()->getPid();
811   TRACE_smpi_init(my_proc_id);
812   TRACE_smpi_computing_init(my_proc_id);
813   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
814   TRACE_smpi_comm_out(my_proc_id);
815   xbt_replay_action_register("init",       simgrid::smpi::action_init);
816   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
817   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
818   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
819   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
820
821   std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
822   std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
823   std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
824   std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
825   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
826
827   xbt_replay_action_register("send",
828                              std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
829   xbt_replay_action_register("Isend",
830                              std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
831   xbt_replay_action_register("recv",
832                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
833   xbt_replay_action_register("Irecv",
834                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
835   xbt_replay_action_register("test", simgrid::smpi::action_test);
836   xbt_replay_action_register("wait",
837                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
838   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
839   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
840   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
841   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
842   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
843   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
844   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
845   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
846   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
847   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
848   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
849   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
850   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
851   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
852   xbt_replay_action_register("compute", simgrid::smpi::action_compute);
853
854   //if we have a delayed start, sleep here.
855   if(*argc>2){
856     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
857     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
858     smpi_execute_flops(value);
859   } else {
860     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
861     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
862     smpi_execute_flops(0.0);
863   }
864 }
865
866 /** @brief actually run the replay after initialization */
867 void smpi_replay_main(int* argc, char*** argv)
868 {
869   simgrid::xbt::replay_runner(*argc, *argv);
870
871   /* and now, finalize everything */
872   /* One active process will stop. Decrease the counter*/
873   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
874   if (not get_reqq_self()->empty()) {
875     unsigned int count_requests=get_reqq_self()->size();
876     MPI_Request requests[count_requests];
877     MPI_Status status[count_requests];
878     unsigned int i=0;
879
880     for (auto const& req : *get_reqq_self()) {
881       requests[i] = req;
882       i++;
883     }
884     simgrid::smpi::Request::waitall(count_requests, requests, status);
885   }
886   delete get_reqq_self();
887   active_processes--;
888
889   if(active_processes==0){
890     /* Last process alive speaking: end the simulated timer */
891     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
892     smpi_free_replay_tmp_buffers();
893   }
894
895   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
896
897   smpi_process()->finalize();
898
899   TRACE_smpi_comm_out(Actor::self()->getPid());
900   TRACE_smpi_finalize(Actor::self()->getPid());
901 }
902
903 /** @brief chain a replay initialization and a replay start */
904 void smpi_replay_run(int* argc, char*** argv)
905 {
906   smpi_replay_init(argc, argv);
907   smpi_replay_main(argc, argv);
908 }