Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Cleanup WaitAction a bit
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     if (name != "Init")
120       log_timed_action(action, start_time);
121   }
122
123   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
124 };
125
126 class WaitAction : public ReplayAction<ActionArgParser> {
127 public:
128   WaitAction() : ReplayAction("Wait") {}
129   void kernel(simgrid::xbt::ReplayAction& action) override
130   {
131     std::string s = boost::algorithm::join(action, " ");
132     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
133     MPI_Request request = get_reqq_self()->back();
134     get_reqq_self()->pop_back();
135
136     if (request == nullptr) {
137       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
138        * return.*/
139       return;
140     }
141
142     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
143
144     // Must be taken before Request::wait() since the request may be set to
145     // MPI_REQUEST_NULL by Request::wait!
146     int src                  = request->comm()->group()->rank(request->src());
147     int dst                  = request->comm()->group()->rank(request->dst());
148     bool is_wait_for_receive = (request->flags() & RECV);
149     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
150     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
151
152     MPI_Status status;
153     Request::wait(&request, &status);
154
155     TRACE_smpi_comm_out(rank);
156     if (is_wait_for_receive)
157       TRACE_smpi_recv(src, dst, 0);
158   }
159 };
160
161 class SendAction : public ReplayAction<SendRecvParser> {
162 public:
163   SendAction() = delete;
164   SendAction(std::string name) : ReplayAction(name) {}
165   void kernel(simgrid::xbt::ReplayAction& action) override
166   {
167     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
168
169     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170                                                                                  Datatype::encode(args.datatype1)));
171     if (not TRACE_smpi_view_internals())
172       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
173
174     if (name == "send") {
175       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176     } else if (name == "Isend") {
177       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178       get_reqq_self()->push_back(request);
179     } else {
180       xbt_die("Don't know this action, %s", name.c_str());
181     }
182
183     TRACE_smpi_comm_out(my_proc_id);
184   }
185 };
186
187 class RecvAction : public ReplayAction<SendRecvParser> {
188 public:
189   RecvAction() = delete;
190   explicit RecvAction(std::string name) : ReplayAction(name) {}
191   void kernel(simgrid::xbt::ReplayAction& action) override
192   {
193     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
194
195     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196                                                                                  Datatype::encode(args.datatype1)));
197
198     MPI_Status status;
199     // unknown size from the receiver point of view
200     if (args.size <= 0.0) {
201       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202       args.size = status.count;
203     }
204
205     if (name == "recv") {
206       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207     } else if (name == "Irecv") {
208       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209       get_reqq_self()->push_back(request);
210     }
211
212     TRACE_smpi_comm_out(my_proc_id);
213     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214     if (name == "recv" && not TRACE_smpi_view_internals()) {
215       TRACE_smpi_recv(src_traced, my_proc_id, 0);
216     }
217   }
218 };
219
220 class ComputeAction : public ReplayAction<ComputeParser> {
221 public:
222   ComputeAction() : ReplayAction("compute") {}
223   void kernel(simgrid::xbt::ReplayAction& action) override
224   {
225     TRACE_smpi_computing_in(my_proc_id, args.flops);
226     smpi_execute_flops(args.flops);
227     TRACE_smpi_computing_out(my_proc_id);
228   }
229 };
230
231 class TestAction : public ReplayAction<ActionArgParser> {
232 public:
233   TestAction() : ReplayAction("Test") {}
234   void kernel(simgrid::xbt::ReplayAction& action) override
235   {
236     MPI_Request request = get_reqq_self()->back();
237     get_reqq_self()->pop_back();
238     // if request is null here, this may mean that a previous test has succeeded
239     // Different times in traced application and replayed version may lead to this
240     // In this case, ignore the extra calls.
241     if (request != nullptr) {
242       TRACE_smpi_testing_in(my_proc_id);
243
244       MPI_Status status;
245       int flag = Request::test(&request, &status);
246
247       XBT_DEBUG("MPI_Test result: %d", flag);
248       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
249        * nullptr.*/
250       get_reqq_self()->push_back(request);
251
252       TRACE_smpi_testing_out(my_proc_id);
253     }
254   }
255 };
256
257 class InitAction : public ReplayAction<ActionArgParser> {
258 public:
259   InitAction() : ReplayAction("Init") {}
260   void kernel(simgrid::xbt::ReplayAction& action) override
261   {
262     CHECK_ACTION_PARAMS(action, 0, 1)
263     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
264                                            : MPI_BYTE;  // default TAU datatype
265
266     /* start a simulated timer */
267     smpi_process()->simulated_start();
268     /*initialize the number of active processes */
269     active_processes = smpi_process_count();
270
271     set_reqq_self(new std::vector<MPI_Request>);
272   }
273 };
274
275 class CommunicatorAction : public ReplayAction<ActionArgParser> {
276 public:
277   CommunicatorAction() : ReplayAction("Comm") {}
278   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
279 };
280
281 class WaitAllAction : public ReplayAction<ActionArgParser> {
282 public:
283   WaitAllAction() : ReplayAction("waitAll") {}
284   void kernel(simgrid::xbt::ReplayAction& action) override
285   {
286     const unsigned int count_requests = get_reqq_self()->size();
287
288     if (count_requests > 0) {
289       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
290                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
291       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
292       for (const auto& req : (*get_reqq_self())) {
293         if (req && (req->flags() & RECV)) {
294           sender_receiver.push_back({req->src(), req->dst()});
295         }
296       }
297       MPI_Status status[count_requests];
298       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
299
300       for (auto& pair : sender_receiver) {
301         TRACE_smpi_recv(pair.first, pair.second, 0);
302       }
303       TRACE_smpi_comm_out(my_proc_id);
304     }
305   }
306 };
307
308 class BarrierAction : public ReplayAction<ActionArgParser> {
309 public:
310   BarrierAction() : ReplayAction("barrier") {}
311   void kernel(simgrid::xbt::ReplayAction& action) override
312   {
313     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
314     Colls::barrier(MPI_COMM_WORLD);
315     TRACE_smpi_comm_out(my_proc_id);
316   }
317 };
318
319 } // Replay Namespace
320
321 static void action_bcast(simgrid::xbt::ReplayAction& action)
322 {
323   CHECK_ACTION_PARAMS(action, 1, 2)
324   double size = parse_double(action[2]);
325   double clock = smpi_process()->simulated_elapsed();
326   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
327   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
328   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
329
330   int my_proc_id = Actor::self()->getPid();
331   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
332                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
333                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
334
335   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
336
337   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
338
339   TRACE_smpi_comm_out(my_proc_id);
340   log_timed_action (action, clock);
341 }
342
343 static void action_reduce(simgrid::xbt::ReplayAction& action)
344 {
345   CHECK_ACTION_PARAMS(action, 2, 2)
346   double comm_size = parse_double(action[2]);
347   double comp_size = parse_double(action[3]);
348   double clock = smpi_process()->simulated_elapsed();
349   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
350
351   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
352
353   int my_proc_id = Actor::self()->getPid();
354   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
355                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
356                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
357
358   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
359   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
360   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
361   smpi_execute_flops(comp_size);
362
363   TRACE_smpi_comm_out(my_proc_id);
364   log_timed_action (action, clock);
365 }
366
367 static void action_allReduce(simgrid::xbt::ReplayAction& action)
368 {
369   CHECK_ACTION_PARAMS(action, 2, 1)
370   double comm_size = parse_double(action[2]);
371   double comp_size = parse_double(action[3]);
372
373   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
374
375   double clock = smpi_process()->simulated_elapsed();
376   int my_proc_id = Actor::self()->getPid();
377   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
378                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
379
380   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
381   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
382   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
383   smpi_execute_flops(comp_size);
384
385   TRACE_smpi_comm_out(my_proc_id);
386   log_timed_action (action, clock);
387 }
388
389 static void action_allToAll(simgrid::xbt::ReplayAction& action)
390 {
391   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
392   double clock = smpi_process()->simulated_elapsed();
393   unsigned long comm_size = MPI_COMM_WORLD->size();
394   int send_size = parse_double(action[2]);
395   int recv_size = parse_double(action[3]);
396   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
397   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
398
399   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
400   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
401
402   int my_proc_id = Actor::self()->getPid();
403   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
404                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
405                                                     Datatype::encode(MPI_CURRENT_TYPE),
406                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
407
408   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
409
410   TRACE_smpi_comm_out(my_proc_id);
411   log_timed_action (action, clock);
412 }
413
414 static void action_gather(simgrid::xbt::ReplayAction& action)
415 {
416   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
417         0 gather 68 68 0 0 0
418       where:
419         1) 68 is the sendcounts
420         2) 68 is the recvcounts
421         3) 0 is the root node
422         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
423         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
424   */
425   CHECK_ACTION_PARAMS(action, 2, 3)
426   double clock = smpi_process()->simulated_elapsed();
427   unsigned long comm_size = MPI_COMM_WORLD->size();
428   int send_size = parse_double(action[2]);
429   int recv_size = parse_double(action[3]);
430   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
431   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
432
433   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
434   void *recv = nullptr;
435   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
436   int rank = MPI_COMM_WORLD->rank();
437
438   if(rank==root)
439     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
440
441   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
442                                                                         Datatype::encode(MPI_CURRENT_TYPE),
443                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
444
445   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
446
447   TRACE_smpi_comm_out(Actor::self()->getPid());
448   log_timed_action (action, clock);
449 }
450
451 static void action_scatter(simgrid::xbt::ReplayAction& action)
452 {
453   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
454         0 gather 68 68 0 0 0
455       where:
456         1) 68 is the sendcounts
457         2) 68 is the recvcounts
458         3) 0 is the root node
459         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
460         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
461   */
462   CHECK_ACTION_PARAMS(action, 2, 3)
463   double clock                   = smpi_process()->simulated_elapsed();
464   unsigned long comm_size        = MPI_COMM_WORLD->size();
465   int send_size                  = parse_double(action[2]);
466   int recv_size                  = parse_double(action[3]);
467   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
468   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
469
470   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
471   void* recv = nullptr;
472   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
473   int rank = MPI_COMM_WORLD->rank();
474
475   if (rank == root)
476     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
477
478   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
479                                                                         Datatype::encode(MPI_CURRENT_TYPE),
480                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
481
482   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
483
484   TRACE_smpi_comm_out(Actor::self()->getPid());
485   log_timed_action(action, clock);
486 }
487
488 static void action_gatherv(simgrid::xbt::ReplayAction& action)
489 {
490   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
491        0 gather 68 68 10 10 10 0 0 0
492      where:
493        1) 68 is the sendcount
494        2) 68 10 10 10 is the recvcounts
495        3) 0 is the root node
496        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
497        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
498   */
499   double clock = smpi_process()->simulated_elapsed();
500   unsigned long comm_size = MPI_COMM_WORLD->size();
501   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
502   int send_size = parse_double(action[2]);
503   std::vector<int> disps(comm_size, 0);
504   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
505
506   MPI_Datatype MPI_CURRENT_TYPE =
507       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
508   MPI_Datatype MPI_CURRENT_TYPE2{
509       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
510
511   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
512   void *recv = nullptr;
513   for (unsigned int i = 0; i < comm_size; i++) {
514     (*recvcounts)[i] = std::stoi(action[i + 3]);
515   }
516   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
517
518   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
519   int rank = MPI_COMM_WORLD->rank();
520
521   if(rank==root)
522     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
523
524   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
525                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
526                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
527
528   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
529                  MPI_COMM_WORLD);
530
531   TRACE_smpi_comm_out(Actor::self()->getPid());
532   log_timed_action (action, clock);
533 }
534
535 static void action_scatterv(simgrid::xbt::ReplayAction& action)
536 {
537   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
538        0 gather 68 10 10 10 68 0 0 0
539      where:
540        1) 68 10 10 10 is the sendcounts
541        2) 68 is the recvcount
542        3) 0 is the root node
543        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
544        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
545   */
546   double clock  = smpi_process()->simulated_elapsed();
547   unsigned long comm_size = MPI_COMM_WORLD->size();
548   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
549   int recv_size = parse_double(action[2 + comm_size]);
550   std::vector<int> disps(comm_size, 0);
551   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
552
553   MPI_Datatype MPI_CURRENT_TYPE =
554       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
555   MPI_Datatype MPI_CURRENT_TYPE2{
556       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
557
558   void* send = nullptr;
559   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
560   for (unsigned int i = 0; i < comm_size; i++) {
561     (*sendcounts)[i] = std::stoi(action[i + 2]);
562   }
563   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
564
565   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
566   int rank = MPI_COMM_WORLD->rank();
567
568   if (rank == root)
569     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
572                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
573                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
574
575   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
576                   MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action(action, clock);
580 }
581
582 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
583 {
584   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
585        0 reduceScatter 275427 275427 275427 204020 11346849 0
586      where:
587        1) The first four values after the name of the action declare the recvcounts array
588        2) The value 11346849 is the amount of instructions
589        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
590  */
591   double clock = smpi_process()->simulated_elapsed();
592   unsigned long comm_size = MPI_COMM_WORLD->size();
593   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
594   int comp_size = parse_double(action[2+comm_size]);
595   int my_proc_id                     = Actor::self()->getPid();
596   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
597   MPI_Datatype MPI_CURRENT_TYPE =
598       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
599
600   for (unsigned int i = 0; i < comm_size; i++) {
601     recvcounts->push_back(std::stoi(action[i + 2]));
602   }
603   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
604
605   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
606                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
607                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
608                                                        Datatype::encode(MPI_CURRENT_TYPE)));
609
610   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
611   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
612
613   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
614   smpi_execute_flops(comp_size);
615
616   TRACE_smpi_comm_out(my_proc_id);
617   log_timed_action (action, clock);
618 }
619
620 static void action_allgather(simgrid::xbt::ReplayAction& action)
621 {
622   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
623         0 allGather 275427 275427
624     where:
625         1) 275427 is the sendcount
626         2) 275427 is the recvcount
627         3) No more values mean that the datatype for sent and receive buffer is the default one, see
628     simgrid::smpi::Datatype::decode().
629   */
630   double clock = smpi_process()->simulated_elapsed();
631
632   CHECK_ACTION_PARAMS(action, 2, 2)
633   int sendcount = std::stoi(action[2]);
634   int recvcount = std::stoi(action[3]);
635
636   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
637   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
638
639   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
640   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
641
642   int my_proc_id = Actor::self()->getPid();
643
644   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
645                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
646                                                     Datatype::encode(MPI_CURRENT_TYPE),
647                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
648
649   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
650
651   TRACE_smpi_comm_out(my_proc_id);
652   log_timed_action (action, clock);
653 }
654
655 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
656 {
657   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
658         0 allGatherV 275427 275427 275427 275427 204020
659      where:
660         1) 275427 is the sendcount
661         2) The next four elements declare the recvcounts array
662         3) No more values mean that the datatype for sent and receive buffer is the default one, see
663      simgrid::smpi::Datatype::decode().
664   */
665   double clock = smpi_process()->simulated_elapsed();
666
667   unsigned long comm_size = MPI_COMM_WORLD->size();
668   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
669   int sendcount = std::stoi(action[2]);
670   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
671   std::vector<int> disps(comm_size, 0);
672
673   int datatype_index = 0, disp_index = 0;
674   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
675     datatype_index = 3 + comm_size;
676     disp_index     = datatype_index + 1;
677   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
678     datatype_index = -1;
679     disp_index     = 3 + comm_size;
680   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
681     datatype_index = 3 + comm_size;
682   }
683
684   if (disp_index != 0) {
685     for (unsigned int i = 0; i < comm_size; i++)
686       disps[i]          = std::stoi(action[disp_index + i]);
687   }
688
689   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
690                                                      : MPI_DEFAULT_TYPE};
691   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
692                                                       : MPI_DEFAULT_TYPE};
693
694   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
695
696   for (unsigned int i = 0; i < comm_size; i++) {
697     (*recvcounts)[i] = std::stoi(action[i + 3]);
698   }
699   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
700   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
701
702   int my_proc_id = Actor::self()->getPid();
703
704   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
705                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
706                                                        Datatype::encode(MPI_CURRENT_TYPE),
707                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
708
709   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
710                     MPI_COMM_WORLD);
711
712   TRACE_smpi_comm_out(my_proc_id);
713   log_timed_action (action, clock);
714 }
715
716 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
717 {
718   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
719         0 allToAllV 100 1 7 10 12 100 1 70 10 5
720      where:
721         1) 100 is the size of the send buffer *sizeof(int),
722         2) 1 7 10 12 is the sendcounts array
723         3) 100*sizeof(int) is the size of the receiver buffer
724         4)  1 70 10 5 is the recvcounts array
725   */
726   double clock = smpi_process()->simulated_elapsed();
727
728   unsigned long comm_size = MPI_COMM_WORLD->size();
729   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
730   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
731   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
732   std::vector<int> senddisps(comm_size, 0);
733   std::vector<int> recvdisps(comm_size, 0);
734
735   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
736                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
737                                       : MPI_DEFAULT_TYPE;
738   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
739                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
740                                      : MPI_DEFAULT_TYPE};
741
742   int send_buf_size=parse_double(action[2]);
743   int recv_buf_size=parse_double(action[3+comm_size]);
744   int my_proc_id = Actor::self()->getPid();
745   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
746   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
747
748   for (unsigned int i = 0; i < comm_size; i++) {
749     (*sendcounts)[i] = std::stoi(action[3 + i]);
750     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
751   }
752   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
753   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
754
755   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
756                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
757                                                        Datatype::encode(MPI_CURRENT_TYPE),
758                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
759
760   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
761                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
762
763   TRACE_smpi_comm_out(my_proc_id);
764   log_timed_action (action, clock);
765 }
766
767 }} // namespace simgrid::smpi
768
769 /** @brief Only initialize the replay, don't do it for real */
770 void smpi_replay_init(int* argc, char*** argv)
771 {
772   simgrid::smpi::Process::init(argc, argv);
773   smpi_process()->mark_as_initialized();
774   smpi_process()->set_replaying(true);
775
776   int my_proc_id = Actor::self()->getPid();
777   TRACE_smpi_init(my_proc_id);
778   TRACE_smpi_computing_init(my_proc_id);
779   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
780   TRACE_smpi_comm_out(my_proc_id);
781   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
782   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
783   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
784   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
785   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
786
787   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
788   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
789   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
790   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
791   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
792   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
793   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
794   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
795   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
796   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
797   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
798   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
799   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
800   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
801   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
802   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
803   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
804   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
805   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
806   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
807   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
808
809   //if we have a delayed start, sleep here.
810   if(*argc>2){
811     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
812     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
813     smpi_execute_flops(value);
814   } else {
815     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
816     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
817     smpi_execute_flops(0.0);
818   }
819 }
820
821 /** @brief actually run the replay after initialization */
822 void smpi_replay_main(int* argc, char*** argv)
823 {
824   simgrid::xbt::replay_runner(*argc, *argv);
825
826   /* and now, finalize everything */
827   /* One active process will stop. Decrease the counter*/
828   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
829   if (not get_reqq_self()->empty()) {
830     unsigned int count_requests=get_reqq_self()->size();
831     MPI_Request requests[count_requests];
832     MPI_Status status[count_requests];
833     unsigned int i=0;
834
835     for (auto const& req : *get_reqq_self()) {
836       requests[i] = req;
837       i++;
838     }
839     simgrid::smpi::Request::waitall(count_requests, requests, status);
840   }
841   delete get_reqq_self();
842   active_processes--;
843
844   if(active_processes==0){
845     /* Last process alive speaking: end the simulated timer */
846     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
847     smpi_free_replay_tmp_buffers();
848   }
849
850   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
851
852   smpi_process()->finalize();
853
854   TRACE_smpi_comm_out(Actor::self()->getPid());
855   TRACE_smpi_finalize(Actor::self()->getPid());
856 }
857
858 /** @brief chain a replay initialization and a replay start */
859 void smpi_replay_run(int* argc, char*** argv)
860 {
861   smpi_replay_init(argc, argv);
862   smpi_replay_main(argc, argv);
863 }