Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Use std::bind / std::function for the wait action
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action){};
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 template <class T> class ReplayAction {
92 protected:
93   const std::string name;
94   T args;
95
96   /*
97    * Used to compute the duration of this action.
98    */
99   double start_time;
100
101   int my_proc_id;
102
103 public:
104   explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
105   {
106   }
107
108   virtual void execute(simgrid::xbt::ReplayAction& action)
109   {
110     // Needs to be re-initialized for every action, hence here
111     start_time = smpi_process()->simulated_elapsed();
112     args.parse(action);
113     kernel(action);
114     log_timed_action(action, start_time);
115   }
116
117   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
118 };
119
120 class WaitAction : public ReplayAction<ActionArgParser> {
121 public:
122   WaitAction() : ReplayAction("Wait") {}
123   void kernel(simgrid::xbt::ReplayAction& action) override
124   {
125     CHECK_ACTION_PARAMS(action, 0, 0)
126     MPI_Status status;
127
128     std::string s = boost::algorithm::join(action, " ");
129     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
130     MPI_Request request = get_reqq_self()->back();
131     get_reqq_self()->pop_back();
132
133     if (request == nullptr) {
134       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
135        * return.*/
136       return;
137     }
138
139     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
140
141     MPI_Group group          = request->comm()->group();
142     int src_traced           = group->rank(request->src());
143     int dst_traced           = group->rank(request->dst());
144     bool is_wait_for_receive = (request->flags() & RECV);
145     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
146     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
147
148     Request::wait(&request, &status);
149
150     TRACE_smpi_comm_out(rank);
151     if (is_wait_for_receive)
152       TRACE_smpi_recv(src_traced, dst_traced, 0);
153   }
154 };
155
156 class SendAction : public ReplayAction<SendRecvParser> {
157 public:
158   SendAction() = delete;
159   SendAction(std::string name) : ReplayAction(name) {}
160   void kernel(simgrid::xbt::ReplayAction& action) override
161   {
162     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
163
164     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
165                                                                                  Datatype::encode(args.datatype1)));
166     if (not TRACE_smpi_view_internals())
167       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
168
169     if (name == "send") {
170       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
171     } else if (name == "Isend") {
172       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
173       get_reqq_self()->push_back(request);
174     } else {
175       xbt_die("Don't know this action, %s", name.c_str());
176     }
177
178     TRACE_smpi_comm_out(my_proc_id);
179   }
180 };
181
182 class RecvAction : public ReplayAction<SendRecvParser> {
183 public:
184   RecvAction() = delete;
185   explicit RecvAction(std::string name) : ReplayAction(name) {}
186   void kernel(simgrid::xbt::ReplayAction& action) override
187   {
188     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
189
190     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
191                                                                                  Datatype::encode(args.datatype1)));
192
193     MPI_Status status;
194     // unknown size from the receiver point of view
195     if (args.size <= 0.0) {
196       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
197       args.size = status.count;
198     }
199
200     if (name == "recv") {
201       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
202     } else if (name == "Irecv") {
203       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
204       get_reqq_self()->push_back(request);
205     }
206
207     TRACE_smpi_comm_out(my_proc_id);
208     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
209     if (name == "recv" && not TRACE_smpi_view_internals()) {
210       TRACE_smpi_recv(src_traced, my_proc_id, 0);
211     }
212   }
213 };
214
215 } // Replay Namespace
216
217 static void action_init(simgrid::xbt::ReplayAction& action)
218 {
219   XBT_DEBUG("Initialize the counters");
220   CHECK_ACTION_PARAMS(action, 0, 1)
221   if (action.size() > 2)
222     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
223   else
224     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
225
226   /* start a simulated timer */
227   smpi_process()->simulated_start();
228   /*initialize the number of active processes */
229   active_processes = smpi_process_count();
230
231   set_reqq_self(new std::vector<MPI_Request>);
232 }
233
234 static void action_finalize(simgrid::xbt::ReplayAction& action)
235 {
236   /* Nothing to do */
237 }
238
239 static void action_comm_size(simgrid::xbt::ReplayAction& action)
240 {
241   communicator_size = parse_double(action[2]);
242   log_timed_action (action, smpi_process()->simulated_elapsed());
243 }
244
245 static void action_comm_split(simgrid::xbt::ReplayAction& action)
246 {
247   log_timed_action (action, smpi_process()->simulated_elapsed());
248 }
249
250 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
251 {
252   log_timed_action (action, smpi_process()->simulated_elapsed());
253 }
254
255 static void action_compute(simgrid::xbt::ReplayAction& action)
256 {
257   CHECK_ACTION_PARAMS(action, 1, 0)
258   double clock = smpi_process()->simulated_elapsed();
259   double flops= parse_double(action[2]);
260   int my_proc_id = Actor::self()->getPid();
261
262   TRACE_smpi_computing_in(my_proc_id, flops);
263   smpi_execute_flops(flops);
264   TRACE_smpi_computing_out(my_proc_id);
265
266   log_timed_action (action, clock);
267 }
268
269 static void action_send(simgrid::xbt::ReplayAction& action)
270 {
271   Replay::SendAction("send").execute(action);
272 }
273
274 static void action_Isend(simgrid::xbt::ReplayAction& action)
275 {
276   Replay::SendAction("Isend").execute(action);
277 }
278
279 static void action_recv(simgrid::xbt::ReplayAction& action)
280 {
281   Replay::RecvAction("recv").execute(action);
282 }
283
284 static void action_Irecv(simgrid::xbt::ReplayAction& action)
285 {
286   Replay::RecvAction("Irecv").execute(action);
287 }
288
289 static void action_test(simgrid::xbt::ReplayAction& action)
290 {
291   CHECK_ACTION_PARAMS(action, 0, 0)
292   double clock = smpi_process()->simulated_elapsed();
293   MPI_Status status;
294
295   MPI_Request request = get_reqq_self()->back();
296   get_reqq_self()->pop_back();
297   //if request is null here, this may mean that a previous test has succeeded
298   //Different times in traced application and replayed version may lead to this
299   //In this case, ignore the extra calls.
300   if(request!=nullptr){
301     int my_proc_id = Actor::self()->getPid();
302     TRACE_smpi_testing_in(my_proc_id);
303
304     int flag = Request::test(&request, &status);
305
306     XBT_DEBUG("MPI_Test result: %d", flag);
307     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
308     get_reqq_self()->push_back(request);
309
310     TRACE_smpi_testing_out(my_proc_id);
311   }
312   log_timed_action (action, clock);
313 }
314
315 static void action_waitall(simgrid::xbt::ReplayAction& action)
316 {
317   CHECK_ACTION_PARAMS(action, 0, 0)
318   double clock = smpi_process()->simulated_elapsed();
319   const unsigned int count_requests = get_reqq_self()->size();
320
321   if (count_requests>0) {
322     MPI_Status status[count_requests];
323
324     int my_proc_id_traced = Actor::self()->getPid();
325     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
326                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
327     int recvs_snd[count_requests];
328     int recvs_rcv[count_requests];
329     for (unsigned int i = 0; i < count_requests; i++) {
330       const auto& req = (*get_reqq_self())[i];
331       if (req && (req->flags() & RECV)) {
332         recvs_snd[i] = req->src();
333         recvs_rcv[i] = req->dst();
334       } else
335         recvs_snd[i] = -100;
336    }
337    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
338
339    for (unsigned i = 0; i < count_requests; i++) {
340      if (recvs_snd[i]!=-100)
341        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
342    }
343    TRACE_smpi_comm_out(my_proc_id_traced);
344   }
345   log_timed_action (action, clock);
346 }
347
348 static void action_barrier(simgrid::xbt::ReplayAction& action)
349 {
350   double clock = smpi_process()->simulated_elapsed();
351   int my_proc_id = Actor::self()->getPid();
352   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
353
354   Colls::barrier(MPI_COMM_WORLD);
355
356   TRACE_smpi_comm_out(my_proc_id);
357   log_timed_action (action, clock);
358 }
359
360 static void action_bcast(simgrid::xbt::ReplayAction& action)
361 {
362   CHECK_ACTION_PARAMS(action, 1, 2)
363   double size = parse_double(action[2]);
364   double clock = smpi_process()->simulated_elapsed();
365   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
366   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
367   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
368
369   int my_proc_id = Actor::self()->getPid();
370   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
371                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
372                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
373
374   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
375
376   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
377
378   TRACE_smpi_comm_out(my_proc_id);
379   log_timed_action (action, clock);
380 }
381
382 static void action_reduce(simgrid::xbt::ReplayAction& action)
383 {
384   CHECK_ACTION_PARAMS(action, 2, 2)
385   double comm_size = parse_double(action[2]);
386   double comp_size = parse_double(action[3]);
387   double clock = smpi_process()->simulated_elapsed();
388   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
389
390   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
391
392   int my_proc_id = Actor::self()->getPid();
393   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
395                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
396
397   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
398   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
399   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
400   smpi_execute_flops(comp_size);
401
402   TRACE_smpi_comm_out(my_proc_id);
403   log_timed_action (action, clock);
404 }
405
406 static void action_allReduce(simgrid::xbt::ReplayAction& action)
407 {
408   CHECK_ACTION_PARAMS(action, 2, 1)
409   double comm_size = parse_double(action[2]);
410   double comp_size = parse_double(action[3]);
411
412   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
413
414   double clock = smpi_process()->simulated_elapsed();
415   int my_proc_id = Actor::self()->getPid();
416   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
417                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
418
419   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
420   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
422   smpi_execute_flops(comp_size);
423
424   TRACE_smpi_comm_out(my_proc_id);
425   log_timed_action (action, clock);
426 }
427
428 static void action_allToAll(simgrid::xbt::ReplayAction& action)
429 {
430   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
431   double clock = smpi_process()->simulated_elapsed();
432   unsigned long comm_size = MPI_COMM_WORLD->size();
433   int send_size = parse_double(action[2]);
434   int recv_size = parse_double(action[3]);
435   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
436   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
437
438   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
439   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
440
441   int my_proc_id = Actor::self()->getPid();
442   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
443                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
444                                                     Datatype::encode(MPI_CURRENT_TYPE),
445                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
446
447   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
448
449   TRACE_smpi_comm_out(my_proc_id);
450   log_timed_action (action, clock);
451 }
452
453 static void action_gather(simgrid::xbt::ReplayAction& action)
454 {
455   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
456         0 gather 68 68 0 0 0
457       where:
458         1) 68 is the sendcounts
459         2) 68 is the recvcounts
460         3) 0 is the root node
461         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
462         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
463   */
464   CHECK_ACTION_PARAMS(action, 2, 3)
465   double clock = smpi_process()->simulated_elapsed();
466   unsigned long comm_size = MPI_COMM_WORLD->size();
467   int send_size = parse_double(action[2]);
468   int recv_size = parse_double(action[3]);
469   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
470   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
471
472   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
473   void *recv = nullptr;
474   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
475   int rank = MPI_COMM_WORLD->rank();
476
477   if(rank==root)
478     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
479
480   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
481                                                                         Datatype::encode(MPI_CURRENT_TYPE),
482                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
483
484   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
485
486   TRACE_smpi_comm_out(Actor::self()->getPid());
487   log_timed_action (action, clock);
488 }
489
490 static void action_scatter(simgrid::xbt::ReplayAction& action)
491 {
492   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
493         0 gather 68 68 0 0 0
494       where:
495         1) 68 is the sendcounts
496         2) 68 is the recvcounts
497         3) 0 is the root node
498         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
499         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
500   */
501   CHECK_ACTION_PARAMS(action, 2, 3)
502   double clock                   = smpi_process()->simulated_elapsed();
503   unsigned long comm_size        = MPI_COMM_WORLD->size();
504   int send_size                  = parse_double(action[2]);
505   int recv_size                  = parse_double(action[3]);
506   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
507   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
508
509   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
510   void* recv = nullptr;
511   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
512   int rank = MPI_COMM_WORLD->rank();
513
514   if (rank == root)
515     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
516
517   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
518                                                                         Datatype::encode(MPI_CURRENT_TYPE),
519                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
520
521   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
522
523   TRACE_smpi_comm_out(Actor::self()->getPid());
524   log_timed_action(action, clock);
525 }
526
527 static void action_gatherv(simgrid::xbt::ReplayAction& action)
528 {
529   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
530        0 gather 68 68 10 10 10 0 0 0
531      where:
532        1) 68 is the sendcount
533        2) 68 10 10 10 is the recvcounts
534        3) 0 is the root node
535        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
536        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
537   */
538   double clock = smpi_process()->simulated_elapsed();
539   unsigned long comm_size = MPI_COMM_WORLD->size();
540   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
541   int send_size = parse_double(action[2]);
542   std::vector<int> disps(comm_size, 0);
543   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
544
545   MPI_Datatype MPI_CURRENT_TYPE =
546       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
547   MPI_Datatype MPI_CURRENT_TYPE2{
548       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
549
550   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
551   void *recv = nullptr;
552   for (unsigned int i = 0; i < comm_size; i++) {
553     (*recvcounts)[i] = std::stoi(action[i + 3]);
554   }
555   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
556
557   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
558   int rank = MPI_COMM_WORLD->rank();
559
560   if(rank==root)
561     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
562
563   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
564                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
565                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
566
567   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
568                  MPI_COMM_WORLD);
569
570   TRACE_smpi_comm_out(Actor::self()->getPid());
571   log_timed_action (action, clock);
572 }
573
574 static void action_scatterv(simgrid::xbt::ReplayAction& action)
575 {
576   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
577        0 gather 68 10 10 10 68 0 0 0
578      where:
579        1) 68 10 10 10 is the sendcounts
580        2) 68 is the recvcount
581        3) 0 is the root node
582        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
583        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
584   */
585   double clock  = smpi_process()->simulated_elapsed();
586   unsigned long comm_size = MPI_COMM_WORLD->size();
587   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
588   int recv_size = parse_double(action[2 + comm_size]);
589   std::vector<int> disps(comm_size, 0);
590   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
591
592   MPI_Datatype MPI_CURRENT_TYPE =
593       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
594   MPI_Datatype MPI_CURRENT_TYPE2{
595       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
596
597   void* send = nullptr;
598   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
599   for (unsigned int i = 0; i < comm_size; i++) {
600     (*sendcounts)[i] = std::stoi(action[i + 2]);
601   }
602   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
603
604   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
605   int rank = MPI_COMM_WORLD->rank();
606
607   if (rank == root)
608     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
609
610   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
611                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
612                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
613
614   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
615                   MPI_COMM_WORLD);
616
617   TRACE_smpi_comm_out(Actor::self()->getPid());
618   log_timed_action(action, clock);
619 }
620
621 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
622 {
623   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
624        0 reduceScatter 275427 275427 275427 204020 11346849 0
625      where:
626        1) The first four values after the name of the action declare the recvcounts array
627        2) The value 11346849 is the amount of instructions
628        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
629  */
630   double clock = smpi_process()->simulated_elapsed();
631   unsigned long comm_size = MPI_COMM_WORLD->size();
632   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
633   int comp_size = parse_double(action[2+comm_size]);
634   int my_proc_id                     = Actor::self()->getPid();
635   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
636   MPI_Datatype MPI_CURRENT_TYPE =
637       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
638
639   for (unsigned int i = 0; i < comm_size; i++) {
640     recvcounts->push_back(std::stoi(action[i + 2]));
641   }
642   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
643
644   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
645                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
646                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
647                                                        Datatype::encode(MPI_CURRENT_TYPE)));
648
649   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
650   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
651
652   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
653   smpi_execute_flops(comp_size);
654
655   TRACE_smpi_comm_out(my_proc_id);
656   log_timed_action (action, clock);
657 }
658
659 static void action_allgather(simgrid::xbt::ReplayAction& action)
660 {
661   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
662         0 allGather 275427 275427
663     where:
664         1) 275427 is the sendcount
665         2) 275427 is the recvcount
666         3) No more values mean that the datatype for sent and receive buffer is the default one, see
667     simgrid::smpi::Datatype::decode().
668   */
669   double clock = smpi_process()->simulated_elapsed();
670
671   CHECK_ACTION_PARAMS(action, 2, 2)
672   int sendcount = std::stoi(action[2]);
673   int recvcount = std::stoi(action[3]);
674
675   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
676   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
677
678   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
679   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
680
681   int my_proc_id = Actor::self()->getPid();
682
683   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
684                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
685                                                     Datatype::encode(MPI_CURRENT_TYPE),
686                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
687
688   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
689
690   TRACE_smpi_comm_out(my_proc_id);
691   log_timed_action (action, clock);
692 }
693
694 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
695 {
696   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
697         0 allGatherV 275427 275427 275427 275427 204020
698      where:
699         1) 275427 is the sendcount
700         2) The next four elements declare the recvcounts array
701         3) No more values mean that the datatype for sent and receive buffer is the default one, see
702      simgrid::smpi::Datatype::decode().
703   */
704   double clock = smpi_process()->simulated_elapsed();
705
706   unsigned long comm_size = MPI_COMM_WORLD->size();
707   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
708   int sendcount = std::stoi(action[2]);
709   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
710   std::vector<int> disps(comm_size, 0);
711
712   int datatype_index = 0, disp_index = 0;
713   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
714     datatype_index = 3 + comm_size;
715     disp_index     = datatype_index + 1;
716   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
717     datatype_index = -1;
718     disp_index     = 3 + comm_size;
719   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
720     datatype_index = 3 + comm_size;
721   }
722
723   if (disp_index != 0) {
724     for (unsigned int i = 0; i < comm_size; i++)
725       disps[i]          = std::stoi(action[disp_index + i]);
726   }
727
728   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
729                                                      : MPI_DEFAULT_TYPE};
730   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
731                                                       : MPI_DEFAULT_TYPE};
732
733   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
734
735   for (unsigned int i = 0; i < comm_size; i++) {
736     (*recvcounts)[i] = std::stoi(action[i + 3]);
737   }
738   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
739   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
740
741   int my_proc_id = Actor::self()->getPid();
742
743   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
744                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
745                                                        Datatype::encode(MPI_CURRENT_TYPE),
746                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
747
748   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
749                     MPI_COMM_WORLD);
750
751   TRACE_smpi_comm_out(my_proc_id);
752   log_timed_action (action, clock);
753 }
754
755 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
756 {
757   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
758         0 allToAllV 100 1 7 10 12 100 1 70 10 5
759      where:
760         1) 100 is the size of the send buffer *sizeof(int),
761         2) 1 7 10 12 is the sendcounts array
762         3) 100*sizeof(int) is the size of the receiver buffer
763         4)  1 70 10 5 is the recvcounts array
764   */
765   double clock = smpi_process()->simulated_elapsed();
766
767   unsigned long comm_size = MPI_COMM_WORLD->size();
768   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
769   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
770   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
771   std::vector<int> senddisps(comm_size, 0);
772   std::vector<int> recvdisps(comm_size, 0);
773
774   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
775                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
776                                       : MPI_DEFAULT_TYPE;
777   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
778                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
779                                      : MPI_DEFAULT_TYPE};
780
781   int send_buf_size=parse_double(action[2]);
782   int recv_buf_size=parse_double(action[3+comm_size]);
783   int my_proc_id = Actor::self()->getPid();
784   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
785   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
786
787   for (unsigned int i = 0; i < comm_size; i++) {
788     (*sendcounts)[i] = std::stoi(action[3 + i]);
789     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
790   }
791   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
792   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
793
794   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
795                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
796                                                        Datatype::encode(MPI_CURRENT_TYPE),
797                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
798
799   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
800                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
801
802   TRACE_smpi_comm_out(my_proc_id);
803   log_timed_action (action, clock);
804 }
805
806 }} // namespace simgrid::smpi
807
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(int* argc, char*** argv)
810 {
811   simgrid::smpi::Process::init(argc, argv);
812   smpi_process()->mark_as_initialized();
813   smpi_process()->set_replaying(true);
814
815   int my_proc_id = Actor::self()->getPid();
816   TRACE_smpi_init(my_proc_id);
817   TRACE_smpi_computing_init(my_proc_id);
818   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
819   TRACE_smpi_comm_out(my_proc_id);
820   xbt_replay_action_register("init",       simgrid::smpi::action_init);
821   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
822   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
823   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
824   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
825   xbt_replay_action_register("send",       simgrid::smpi::action_send);
826   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
827   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
828   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
829   xbt_replay_action_register("test",       simgrid::smpi::action_test);
830   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
831   xbt_replay_action_register("wait",
832                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
833   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
834   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
835   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
836   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
837   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
838   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
839   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
840   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
841   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
842   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
843   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
844   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
845   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
846   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
847   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
848
849   //if we have a delayed start, sleep here.
850   if(*argc>2){
851     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
852     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
853     smpi_execute_flops(value);
854   } else {
855     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
856     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
857     smpi_execute_flops(0.0);
858   }
859 }
860
861 /** @brief actually run the replay after initialization */
862 void smpi_replay_main(int* argc, char*** argv)
863 {
864   simgrid::xbt::replay_runner(*argc, *argv);
865
866   /* and now, finalize everything */
867   /* One active process will stop. Decrease the counter*/
868   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
869   if (not get_reqq_self()->empty()) {
870     unsigned int count_requests=get_reqq_self()->size();
871     MPI_Request requests[count_requests];
872     MPI_Status status[count_requests];
873     unsigned int i=0;
874
875     for (auto const& req : *get_reqq_self()) {
876       requests[i] = req;
877       i++;
878     }
879     simgrid::smpi::Request::waitall(count_requests, requests, status);
880   }
881   delete get_reqq_self();
882   active_processes--;
883
884   if(active_processes==0){
885     /* Last process alive speaking: end the simulated timer */
886     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
887     smpi_free_replay_tmp_buffers();
888   }
889
890   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
891
892   smpi_process()->finalize();
893
894   TRACE_smpi_comm_out(Actor::self()->getPid());
895   TRACE_smpi_finalize(Actor::self()->getPid());
896 }
897
898 /** @brief chain a replay initialization and a replay start */
899 void smpi_replay_run(int* argc, char*** argv)
900 {
901   smpi_replay_init(argc, argv);
902   smpi_replay_main(argc, argv);
903 }