Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Cleanup WaitAction a bit
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     log_timed_action(action, start_time);
120   }
121
122   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
123 };
124
125 class WaitAction : public ReplayAction<ActionArgParser> {
126 public:
127   WaitAction() : ReplayAction("Wait") {}
128   void kernel(simgrid::xbt::ReplayAction& action) override
129   {
130     CHECK_ACTION_PARAMS(action, 0, 0)
131     MPI_Status status;
132
133     std::string s = boost::algorithm::join(action, " ");
134     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135     MPI_Request request = get_reqq_self()->back();
136     get_reqq_self()->pop_back();
137
138     if (request == nullptr) {
139       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
140        * return.*/
141       return;
142     }
143
144     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
145
146     // Must be taken before Request::wait() since the request may be set to
147     // MPI_REQUEST_NULL by Request::wait!
148     int src                  = request->comm()->group()->rank(request->src());
149     int dst                  = request->comm()->group()->rank(request->dst());
150     bool is_wait_for_receive = (request->flags() & RECV);
151     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
152     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
153
154     Request::wait(&request, &status);
155
156     TRACE_smpi_comm_out(rank);
157     if (is_wait_for_receive)
158       TRACE_smpi_recv(src_traced, dst_traced, 0);
159   }
160 };
161
162 class SendAction : public ReplayAction<SendRecvParser> {
163 public:
164   SendAction() = delete;
165   SendAction(std::string name) : ReplayAction(name) {}
166   void kernel(simgrid::xbt::ReplayAction& action) override
167   {
168     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
169
170     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
171                                                                                  Datatype::encode(args.datatype1)));
172     if (not TRACE_smpi_view_internals())
173       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
174
175     if (name == "send") {
176       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
177     } else if (name == "Isend") {
178       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179       get_reqq_self()->push_back(request);
180     } else {
181       xbt_die("Don't know this action, %s", name.c_str());
182     }
183
184     TRACE_smpi_comm_out(my_proc_id);
185   }
186 };
187
188 class RecvAction : public ReplayAction<SendRecvParser> {
189 public:
190   RecvAction() = delete;
191   explicit RecvAction(std::string name) : ReplayAction(name) {}
192   void kernel(simgrid::xbt::ReplayAction& action) override
193   {
194     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
195
196     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
197                                                                                  Datatype::encode(args.datatype1)));
198
199     MPI_Status status;
200     // unknown size from the receiver point of view
201     if (args.size <= 0.0) {
202       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
203       args.size = status.count;
204     }
205
206     if (name == "recv") {
207       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
208     } else if (name == "Irecv") {
209       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
210       get_reqq_self()->push_back(request);
211     }
212
213     TRACE_smpi_comm_out(my_proc_id);
214     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
215     if (name == "recv" && not TRACE_smpi_view_internals()) {
216       TRACE_smpi_recv(src_traced, my_proc_id, 0);
217     }
218   }
219 };
220
221 class ComputeAction : public ReplayAction<ComputeParser> {
222 public:
223   ComputeAction() : ReplayAction("compute") {}
224   void kernel(simgrid::xbt::ReplayAction& action) override
225   {
226     TRACE_smpi_computing_in(my_proc_id, args.flops);
227     smpi_execute_flops(args.flops);
228     TRACE_smpi_computing_out(my_proc_id);
229   }
230 };
231
232 class TestAction : public ReplayAction<ActionArgParser> {
233 public:
234   TestAction() : ReplayAction("Test") {}
235   void kernel(simgrid::xbt::ReplayAction& action) override
236   {
237     MPI_Request request = get_reqq_self()->back();
238     get_reqq_self()->pop_back();
239     // if request is null here, this may mean that a previous test has succeeded
240     // Different times in traced application and replayed version may lead to this
241     // In this case, ignore the extra calls.
242     if (request != nullptr) {
243       TRACE_smpi_testing_in(my_proc_id);
244
245       MPI_Status status;
246       int flag = Request::test(&request, &status);
247
248       XBT_DEBUG("MPI_Test result: %d", flag);
249       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
250        * nullptr.*/
251       get_reqq_self()->push_back(request);
252
253       TRACE_smpi_testing_out(my_proc_id);
254     }
255   }
256 };
257
258 } // Replay Namespace
259
260 static void action_init(simgrid::xbt::ReplayAction& action)
261 {
262   XBT_DEBUG("Initialize the counters");
263   CHECK_ACTION_PARAMS(action, 0, 1)
264   if (action.size() > 2)
265     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
266   else
267     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
268
269   /* start a simulated timer */
270   smpi_process()->simulated_start();
271   /*initialize the number of active processes */
272   active_processes = smpi_process_count();
273
274   set_reqq_self(new std::vector<MPI_Request>);
275 }
276
277 static void action_finalize(simgrid::xbt::ReplayAction& action)
278 {
279   /* Nothing to do */
280 }
281
282 static void action_comm_size(simgrid::xbt::ReplayAction& action)
283 {
284   log_timed_action (action, smpi_process()->simulated_elapsed());
285 }
286
287 static void action_comm_split(simgrid::xbt::ReplayAction& action)
288 {
289   log_timed_action (action, smpi_process()->simulated_elapsed());
290 }
291
292 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
293 {
294   log_timed_action (action, smpi_process()->simulated_elapsed());
295 }
296
297 static void action_waitall(simgrid::xbt::ReplayAction& action)
298 {
299   CHECK_ACTION_PARAMS(action, 0, 0)
300   double clock = smpi_process()->simulated_elapsed();
301   const unsigned int count_requests = get_reqq_self()->size();
302
303   if (count_requests>0) {
304     MPI_Status status[count_requests];
305
306     int my_proc_id_traced = Actor::self()->getPid();
307     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
308                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
309     int recvs_snd[count_requests];
310     int recvs_rcv[count_requests];
311     for (unsigned int i = 0; i < count_requests; i++) {
312       const auto& req = (*get_reqq_self())[i];
313       if (req && (req->flags() & RECV)) {
314         recvs_snd[i] = req->src();
315         recvs_rcv[i] = req->dst();
316       } else
317         recvs_snd[i] = -100;
318    }
319    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
320
321    for (unsigned i = 0; i < count_requests; i++) {
322      if (recvs_snd[i]!=-100)
323        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
324    }
325    TRACE_smpi_comm_out(my_proc_id_traced);
326   }
327   log_timed_action (action, clock);
328 }
329
330 static void action_barrier(simgrid::xbt::ReplayAction& action)
331 {
332   double clock = smpi_process()->simulated_elapsed();
333   int my_proc_id = Actor::self()->getPid();
334   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
335
336   Colls::barrier(MPI_COMM_WORLD);
337
338   TRACE_smpi_comm_out(my_proc_id);
339   log_timed_action (action, clock);
340 }
341
342 static void action_bcast(simgrid::xbt::ReplayAction& action)
343 {
344   CHECK_ACTION_PARAMS(action, 1, 2)
345   double size = parse_double(action[2]);
346   double clock = smpi_process()->simulated_elapsed();
347   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
348   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
349   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
350
351   int my_proc_id = Actor::self()->getPid();
352   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
353                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
354                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
355
356   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
357
358   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
359
360   TRACE_smpi_comm_out(my_proc_id);
361   log_timed_action (action, clock);
362 }
363
364 static void action_reduce(simgrid::xbt::ReplayAction& action)
365 {
366   CHECK_ACTION_PARAMS(action, 2, 2)
367   double comm_size = parse_double(action[2]);
368   double comp_size = parse_double(action[3]);
369   double clock = smpi_process()->simulated_elapsed();
370   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
371
372   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
373
374   int my_proc_id = Actor::self()->getPid();
375   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
376                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
377                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
378
379   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
380   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
381   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
382   smpi_execute_flops(comp_size);
383
384   TRACE_smpi_comm_out(my_proc_id);
385   log_timed_action (action, clock);
386 }
387
388 static void action_allReduce(simgrid::xbt::ReplayAction& action)
389 {
390   CHECK_ACTION_PARAMS(action, 2, 1)
391   double comm_size = parse_double(action[2]);
392   double comp_size = parse_double(action[3]);
393
394   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
395
396   double clock = smpi_process()->simulated_elapsed();
397   int my_proc_id = Actor::self()->getPid();
398   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
399                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
400
401   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
402   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
403   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
404   smpi_execute_flops(comp_size);
405
406   TRACE_smpi_comm_out(my_proc_id);
407   log_timed_action (action, clock);
408 }
409
410 static void action_allToAll(simgrid::xbt::ReplayAction& action)
411 {
412   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
413   double clock = smpi_process()->simulated_elapsed();
414   unsigned long comm_size = MPI_COMM_WORLD->size();
415   int send_size = parse_double(action[2]);
416   int recv_size = parse_double(action[3]);
417   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
418   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
419
420   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
421   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
422
423   int my_proc_id = Actor::self()->getPid();
424   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
425                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
426                                                     Datatype::encode(MPI_CURRENT_TYPE),
427                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
428
429   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
430
431   TRACE_smpi_comm_out(my_proc_id);
432   log_timed_action (action, clock);
433 }
434
435 static void action_gather(simgrid::xbt::ReplayAction& action)
436 {
437   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
438         0 gather 68 68 0 0 0
439       where:
440         1) 68 is the sendcounts
441         2) 68 is the recvcounts
442         3) 0 is the root node
443         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
444         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
445   */
446   CHECK_ACTION_PARAMS(action, 2, 3)
447   double clock = smpi_process()->simulated_elapsed();
448   unsigned long comm_size = MPI_COMM_WORLD->size();
449   int send_size = parse_double(action[2]);
450   int recv_size = parse_double(action[3]);
451   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
452   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
453
454   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
455   void *recv = nullptr;
456   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
457   int rank = MPI_COMM_WORLD->rank();
458
459   if(rank==root)
460     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
461
462   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
463                                                                         Datatype::encode(MPI_CURRENT_TYPE),
464                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
465
466   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
467
468   TRACE_smpi_comm_out(Actor::self()->getPid());
469   log_timed_action (action, clock);
470 }
471
472 static void action_scatter(simgrid::xbt::ReplayAction& action)
473 {
474   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
475         0 gather 68 68 0 0 0
476       where:
477         1) 68 is the sendcounts
478         2) 68 is the recvcounts
479         3) 0 is the root node
480         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
481         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
482   */
483   CHECK_ACTION_PARAMS(action, 2, 3)
484   double clock                   = smpi_process()->simulated_elapsed();
485   unsigned long comm_size        = MPI_COMM_WORLD->size();
486   int send_size                  = parse_double(action[2]);
487   int recv_size                  = parse_double(action[3]);
488   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
489   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
490
491   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
492   void* recv = nullptr;
493   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
494   int rank = MPI_COMM_WORLD->rank();
495
496   if (rank == root)
497     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
498
499   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
500                                                                         Datatype::encode(MPI_CURRENT_TYPE),
501                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
502
503   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(Actor::self()->getPid());
506   log_timed_action(action, clock);
507 }
508
509 static void action_gatherv(simgrid::xbt::ReplayAction& action)
510 {
511   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
512        0 gather 68 68 10 10 10 0 0 0
513      where:
514        1) 68 is the sendcount
515        2) 68 10 10 10 is the recvcounts
516        3) 0 is the root node
517        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
518        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
519   */
520   double clock = smpi_process()->simulated_elapsed();
521   unsigned long comm_size = MPI_COMM_WORLD->size();
522   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
523   int send_size = parse_double(action[2]);
524   std::vector<int> disps(comm_size, 0);
525   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
526
527   MPI_Datatype MPI_CURRENT_TYPE =
528       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
529   MPI_Datatype MPI_CURRENT_TYPE2{
530       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
531
532   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
533   void *recv = nullptr;
534   for (unsigned int i = 0; i < comm_size; i++) {
535     (*recvcounts)[i] = std::stoi(action[i + 3]);
536   }
537   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
538
539   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
540   int rank = MPI_COMM_WORLD->rank();
541
542   if(rank==root)
543     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
544
545   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
546                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
547                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
548
549   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
550                  MPI_COMM_WORLD);
551
552   TRACE_smpi_comm_out(Actor::self()->getPid());
553   log_timed_action (action, clock);
554 }
555
556 static void action_scatterv(simgrid::xbt::ReplayAction& action)
557 {
558   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
559        0 gather 68 10 10 10 68 0 0 0
560      where:
561        1) 68 10 10 10 is the sendcounts
562        2) 68 is the recvcount
563        3) 0 is the root node
564        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
565        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
566   */
567   double clock  = smpi_process()->simulated_elapsed();
568   unsigned long comm_size = MPI_COMM_WORLD->size();
569   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
570   int recv_size = parse_double(action[2 + comm_size]);
571   std::vector<int> disps(comm_size, 0);
572   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
573
574   MPI_Datatype MPI_CURRENT_TYPE =
575       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
576   MPI_Datatype MPI_CURRENT_TYPE2{
577       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
578
579   void* send = nullptr;
580   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
581   for (unsigned int i = 0; i < comm_size; i++) {
582     (*sendcounts)[i] = std::stoi(action[i + 2]);
583   }
584   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
585
586   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
587   int rank = MPI_COMM_WORLD->rank();
588
589   if (rank == root)
590     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
591
592   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
593                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
594                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
595
596   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
597                   MPI_COMM_WORLD);
598
599   TRACE_smpi_comm_out(Actor::self()->getPid());
600   log_timed_action(action, clock);
601 }
602
603 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
604 {
605   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
606        0 reduceScatter 275427 275427 275427 204020 11346849 0
607      where:
608        1) The first four values after the name of the action declare the recvcounts array
609        2) The value 11346849 is the amount of instructions
610        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
611  */
612   double clock = smpi_process()->simulated_elapsed();
613   unsigned long comm_size = MPI_COMM_WORLD->size();
614   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
615   int comp_size = parse_double(action[2+comm_size]);
616   int my_proc_id                     = Actor::self()->getPid();
617   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
618   MPI_Datatype MPI_CURRENT_TYPE =
619       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
620
621   for (unsigned int i = 0; i < comm_size; i++) {
622     recvcounts->push_back(std::stoi(action[i + 2]));
623   }
624   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
625
626   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
627                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
628                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
629                                                        Datatype::encode(MPI_CURRENT_TYPE)));
630
631   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
632   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
633
634   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
635   smpi_execute_flops(comp_size);
636
637   TRACE_smpi_comm_out(my_proc_id);
638   log_timed_action (action, clock);
639 }
640
641 static void action_allgather(simgrid::xbt::ReplayAction& action)
642 {
643   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
644         0 allGather 275427 275427
645     where:
646         1) 275427 is the sendcount
647         2) 275427 is the recvcount
648         3) No more values mean that the datatype for sent and receive buffer is the default one, see
649     simgrid::smpi::Datatype::decode().
650   */
651   double clock = smpi_process()->simulated_elapsed();
652
653   CHECK_ACTION_PARAMS(action, 2, 2)
654   int sendcount = std::stoi(action[2]);
655   int recvcount = std::stoi(action[3]);
656
657   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
658   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
659
660   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
661   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
662
663   int my_proc_id = Actor::self()->getPid();
664
665   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
666                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
667                                                     Datatype::encode(MPI_CURRENT_TYPE),
668                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
669
670   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
671
672   TRACE_smpi_comm_out(my_proc_id);
673   log_timed_action (action, clock);
674 }
675
676 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
677 {
678   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
679         0 allGatherV 275427 275427 275427 275427 204020
680      where:
681         1) 275427 is the sendcount
682         2) The next four elements declare the recvcounts array
683         3) No more values mean that the datatype for sent and receive buffer is the default one, see
684      simgrid::smpi::Datatype::decode().
685   */
686   double clock = smpi_process()->simulated_elapsed();
687
688   unsigned long comm_size = MPI_COMM_WORLD->size();
689   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
690   int sendcount = std::stoi(action[2]);
691   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
692   std::vector<int> disps(comm_size, 0);
693
694   int datatype_index = 0, disp_index = 0;
695   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
696     datatype_index = 3 + comm_size;
697     disp_index     = datatype_index + 1;
698   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
699     datatype_index = -1;
700     disp_index     = 3 + comm_size;
701   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
702     datatype_index = 3 + comm_size;
703   }
704
705   if (disp_index != 0) {
706     for (unsigned int i = 0; i < comm_size; i++)
707       disps[i]          = std::stoi(action[disp_index + i]);
708   }
709
710   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
711                                                      : MPI_DEFAULT_TYPE};
712   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
713                                                       : MPI_DEFAULT_TYPE};
714
715   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
716
717   for (unsigned int i = 0; i < comm_size; i++) {
718     (*recvcounts)[i] = std::stoi(action[i + 3]);
719   }
720   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
721   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
722
723   int my_proc_id = Actor::self()->getPid();
724
725   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
726                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
727                                                        Datatype::encode(MPI_CURRENT_TYPE),
728                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
729
730   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
731                     MPI_COMM_WORLD);
732
733   TRACE_smpi_comm_out(my_proc_id);
734   log_timed_action (action, clock);
735 }
736
737 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
738 {
739   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
740         0 allToAllV 100 1 7 10 12 100 1 70 10 5
741      where:
742         1) 100 is the size of the send buffer *sizeof(int),
743         2) 1 7 10 12 is the sendcounts array
744         3) 100*sizeof(int) is the size of the receiver buffer
745         4)  1 70 10 5 is the recvcounts array
746   */
747   double clock = smpi_process()->simulated_elapsed();
748
749   unsigned long comm_size = MPI_COMM_WORLD->size();
750   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
751   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
752   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
753   std::vector<int> senddisps(comm_size, 0);
754   std::vector<int> recvdisps(comm_size, 0);
755
756   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
757                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
758                                       : MPI_DEFAULT_TYPE;
759   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
760                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
761                                      : MPI_DEFAULT_TYPE};
762
763   int send_buf_size=parse_double(action[2]);
764   int recv_buf_size=parse_double(action[3+comm_size]);
765   int my_proc_id = Actor::self()->getPid();
766   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
767   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
768
769   for (unsigned int i = 0; i < comm_size; i++) {
770     (*sendcounts)[i] = std::stoi(action[3 + i]);
771     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
772   }
773   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
774   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
775
776   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
777                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
778                                                        Datatype::encode(MPI_CURRENT_TYPE),
779                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
780
781   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
782                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
783
784   TRACE_smpi_comm_out(my_proc_id);
785   log_timed_action (action, clock);
786 }
787
788 }} // namespace simgrid::smpi
789
790 /** @brief Only initialize the replay, don't do it for real */
791 void smpi_replay_init(int* argc, char*** argv)
792 {
793   simgrid::smpi::Process::init(argc, argv);
794   smpi_process()->mark_as_initialized();
795   smpi_process()->set_replaying(true);
796
797   int my_proc_id = Actor::self()->getPid();
798   TRACE_smpi_init(my_proc_id);
799   TRACE_smpi_computing_init(my_proc_id);
800   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
801   TRACE_smpi_comm_out(my_proc_id);
802   xbt_replay_action_register("init",       simgrid::smpi::action_init);
803   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
804   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
805   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
806   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
807
808   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
809   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
810   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
811   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
812   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
813   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
814   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
815   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
816   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
817   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
818   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
819   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
820   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
821   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
822   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
823   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
824   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
825   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
826   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
827   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
828   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
829
830   //if we have a delayed start, sleep here.
831   if(*argc>2){
832     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
833     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
834     smpi_execute_flops(value);
835   } else {
836     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
837     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
838     smpi_execute_flops(0.0);
839   }
840 }
841
842 /** @brief actually run the replay after initialization */
843 void smpi_replay_main(int* argc, char*** argv)
844 {
845   simgrid::xbt::replay_runner(*argc, *argv);
846
847   /* and now, finalize everything */
848   /* One active process will stop. Decrease the counter*/
849   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
850   if (not get_reqq_self()->empty()) {
851     unsigned int count_requests=get_reqq_self()->size();
852     MPI_Request requests[count_requests];
853     MPI_Status status[count_requests];
854     unsigned int i=0;
855
856     for (auto const& req : *get_reqq_self()) {
857       requests[i] = req;
858       i++;
859     }
860     simgrid::smpi::Request::waitall(count_requests, requests, status);
861   }
862   delete get_reqq_self();
863   active_processes--;
864
865   if(active_processes==0){
866     /* Last process alive speaking: end the simulated timer */
867     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
868     smpi_free_replay_tmp_buffers();
869   }
870
871   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
872
873   smpi_process()->finalize();
874
875   TRACE_smpi_comm_out(Actor::self()->getPid());
876   TRACE_smpi_finalize(Actor::self()->getPid());
877 }
878
879 /** @brief chain a replay initialization and a replay start */
880 void smpi_replay_run(int* argc, char*** argv)
881 {
882   smpi_replay_init(argc, argv);
883   smpi_replay_main(argc, argv);
884 }