Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Remove action_* functions
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     log_timed_action(action, start_time);
120   }
121
122   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
123 };
124
125 class WaitAction : public ReplayAction<ActionArgParser> {
126 public:
127   WaitAction() : ReplayAction("Wait") {}
128   void kernel(simgrid::xbt::ReplayAction& action) override
129   {
130     CHECK_ACTION_PARAMS(action, 0, 0)
131     MPI_Status status;
132
133     std::string s = boost::algorithm::join(action, " ");
134     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135     MPI_Request request = get_reqq_self()->back();
136     get_reqq_self()->pop_back();
137
138     if (request == nullptr) {
139       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
140        * return.*/
141       return;
142     }
143
144     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
145
146     MPI_Group group          = request->comm()->group();
147     int src_traced           = group->rank(request->src());
148     int dst_traced           = group->rank(request->dst());
149     bool is_wait_for_receive = (request->flags() & RECV);
150     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
151     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
152
153     Request::wait(&request, &status);
154
155     TRACE_smpi_comm_out(rank);
156     if (is_wait_for_receive)
157       TRACE_smpi_recv(src_traced, dst_traced, 0);
158   }
159 };
160
161 class SendAction : public ReplayAction<SendRecvParser> {
162 public:
163   SendAction() = delete;
164   SendAction(std::string name) : ReplayAction(name) {}
165   void kernel(simgrid::xbt::ReplayAction& action) override
166   {
167     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
168
169     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170                                                                                  Datatype::encode(args.datatype1)));
171     if (not TRACE_smpi_view_internals())
172       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
173
174     if (name == "send") {
175       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176     } else if (name == "Isend") {
177       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178       get_reqq_self()->push_back(request);
179     } else {
180       xbt_die("Don't know this action, %s", name.c_str());
181     }
182
183     TRACE_smpi_comm_out(my_proc_id);
184   }
185 };
186
187 class RecvAction : public ReplayAction<SendRecvParser> {
188 public:
189   RecvAction() = delete;
190   explicit RecvAction(std::string name) : ReplayAction(name) {}
191   void kernel(simgrid::xbt::ReplayAction& action) override
192   {
193     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
194
195     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196                                                                                  Datatype::encode(args.datatype1)));
197
198     MPI_Status status;
199     // unknown size from the receiver point of view
200     if (args.size <= 0.0) {
201       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202       args.size = status.count;
203     }
204
205     if (name == "recv") {
206       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207     } else if (name == "Irecv") {
208       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209       get_reqq_self()->push_back(request);
210     }
211
212     TRACE_smpi_comm_out(my_proc_id);
213     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214     if (name == "recv" && not TRACE_smpi_view_internals()) {
215       TRACE_smpi_recv(src_traced, my_proc_id, 0);
216     }
217   }
218 };
219
220 class ComputeAction : public ReplayAction<ComputeParser> {
221 public:
222   ComputeAction() : ReplayAction("compute") {}
223   void kernel(simgrid::xbt::ReplayAction& action) override
224   {
225     TRACE_smpi_computing_in(my_proc_id, args.flops);
226     smpi_execute_flops(args.flops);
227     TRACE_smpi_computing_out(my_proc_id);
228   }
229 };
230
231 class TestAction : public ReplayAction<ActionArgParser> {
232 public:
233   TestAction() : ReplayAction("Test") {}
234   void kernel(simgrid::xbt::ReplayAction& action) override
235   {
236     MPI_Request request = get_reqq_self()->back();
237     get_reqq_self()->pop_back();
238     // if request is null here, this may mean that a previous test has succeeded
239     // Different times in traced application and replayed version may lead to this
240     // In this case, ignore the extra calls.
241     if (request != nullptr) {
242       TRACE_smpi_testing_in(my_proc_id);
243
244       MPI_Status status;
245       int flag = Request::test(&request, &status);
246
247       XBT_DEBUG("MPI_Test result: %d", flag);
248       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
249        * nullptr.*/
250       get_reqq_self()->push_back(request);
251
252       TRACE_smpi_testing_out(my_proc_id);
253     }
254   }
255 };
256
257 } // Replay Namespace
258
259 static void action_init(simgrid::xbt::ReplayAction& action)
260 {
261   XBT_DEBUG("Initialize the counters");
262   CHECK_ACTION_PARAMS(action, 0, 1)
263   if (action.size() > 2)
264     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
265   else
266     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
267
268   /* start a simulated timer */
269   smpi_process()->simulated_start();
270   /*initialize the number of active processes */
271   active_processes = smpi_process_count();
272
273   set_reqq_self(new std::vector<MPI_Request>);
274 }
275
276 static void action_finalize(simgrid::xbt::ReplayAction& action)
277 {
278   /* Nothing to do */
279 }
280
281 static void action_comm_size(simgrid::xbt::ReplayAction& action)
282 {
283   log_timed_action (action, smpi_process()->simulated_elapsed());
284 }
285
286 static void action_comm_split(simgrid::xbt::ReplayAction& action)
287 {
288   log_timed_action (action, smpi_process()->simulated_elapsed());
289 }
290
291 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
292 {
293   log_timed_action (action, smpi_process()->simulated_elapsed());
294 }
295
296 static void action_waitall(simgrid::xbt::ReplayAction& action)
297 {
298   CHECK_ACTION_PARAMS(action, 0, 0)
299   double clock = smpi_process()->simulated_elapsed();
300   const unsigned int count_requests = get_reqq_self()->size();
301
302   if (count_requests>0) {
303     MPI_Status status[count_requests];
304
305     int my_proc_id_traced = Actor::self()->getPid();
306     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
307                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
308     int recvs_snd[count_requests];
309     int recvs_rcv[count_requests];
310     for (unsigned int i = 0; i < count_requests; i++) {
311       const auto& req = (*get_reqq_self())[i];
312       if (req && (req->flags() & RECV)) {
313         recvs_snd[i] = req->src();
314         recvs_rcv[i] = req->dst();
315       } else
316         recvs_snd[i] = -100;
317    }
318    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
319
320    for (unsigned i = 0; i < count_requests; i++) {
321      if (recvs_snd[i]!=-100)
322        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
323    }
324    TRACE_smpi_comm_out(my_proc_id_traced);
325   }
326   log_timed_action (action, clock);
327 }
328
329 static void action_barrier(simgrid::xbt::ReplayAction& action)
330 {
331   double clock = smpi_process()->simulated_elapsed();
332   int my_proc_id = Actor::self()->getPid();
333   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
334
335   Colls::barrier(MPI_COMM_WORLD);
336
337   TRACE_smpi_comm_out(my_proc_id);
338   log_timed_action (action, clock);
339 }
340
341 static void action_bcast(simgrid::xbt::ReplayAction& action)
342 {
343   CHECK_ACTION_PARAMS(action, 1, 2)
344   double size = parse_double(action[2]);
345   double clock = smpi_process()->simulated_elapsed();
346   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
347   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
348   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
349
350   int my_proc_id = Actor::self()->getPid();
351   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
352                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
353                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
354
355   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
356
357   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
358
359   TRACE_smpi_comm_out(my_proc_id);
360   log_timed_action (action, clock);
361 }
362
363 static void action_reduce(simgrid::xbt::ReplayAction& action)
364 {
365   CHECK_ACTION_PARAMS(action, 2, 2)
366   double comm_size = parse_double(action[2]);
367   double comp_size = parse_double(action[3]);
368   double clock = smpi_process()->simulated_elapsed();
369   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
370
371   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
372
373   int my_proc_id = Actor::self()->getPid();
374   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
375                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
376                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
377
378   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
379   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
380   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
381   smpi_execute_flops(comp_size);
382
383   TRACE_smpi_comm_out(my_proc_id);
384   log_timed_action (action, clock);
385 }
386
387 static void action_allReduce(simgrid::xbt::ReplayAction& action)
388 {
389   CHECK_ACTION_PARAMS(action, 2, 1)
390   double comm_size = parse_double(action[2]);
391   double comp_size = parse_double(action[3]);
392
393   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
394
395   double clock = smpi_process()->simulated_elapsed();
396   int my_proc_id = Actor::self()->getPid();
397   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
398                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
399
400   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
401   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
402   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
403   smpi_execute_flops(comp_size);
404
405   TRACE_smpi_comm_out(my_proc_id);
406   log_timed_action (action, clock);
407 }
408
409 static void action_allToAll(simgrid::xbt::ReplayAction& action)
410 {
411   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
412   double clock = smpi_process()->simulated_elapsed();
413   unsigned long comm_size = MPI_COMM_WORLD->size();
414   int send_size = parse_double(action[2]);
415   int recv_size = parse_double(action[3]);
416   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
417   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
418
419   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
420   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
421
422   int my_proc_id = Actor::self()->getPid();
423   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
424                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
425                                                     Datatype::encode(MPI_CURRENT_TYPE),
426                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
427
428   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
429
430   TRACE_smpi_comm_out(my_proc_id);
431   log_timed_action (action, clock);
432 }
433
434 static void action_gather(simgrid::xbt::ReplayAction& action)
435 {
436   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
437         0 gather 68 68 0 0 0
438       where:
439         1) 68 is the sendcounts
440         2) 68 is the recvcounts
441         3) 0 is the root node
442         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
443         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
444   */
445   CHECK_ACTION_PARAMS(action, 2, 3)
446   double clock = smpi_process()->simulated_elapsed();
447   unsigned long comm_size = MPI_COMM_WORLD->size();
448   int send_size = parse_double(action[2]);
449   int recv_size = parse_double(action[3]);
450   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
451   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
452
453   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
454   void *recv = nullptr;
455   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
456   int rank = MPI_COMM_WORLD->rank();
457
458   if(rank==root)
459     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
460
461   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
462                                                                         Datatype::encode(MPI_CURRENT_TYPE),
463                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
464
465   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
466
467   TRACE_smpi_comm_out(Actor::self()->getPid());
468   log_timed_action (action, clock);
469 }
470
471 static void action_scatter(simgrid::xbt::ReplayAction& action)
472 {
473   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
474         0 gather 68 68 0 0 0
475       where:
476         1) 68 is the sendcounts
477         2) 68 is the recvcounts
478         3) 0 is the root node
479         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
480         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
481   */
482   CHECK_ACTION_PARAMS(action, 2, 3)
483   double clock                   = smpi_process()->simulated_elapsed();
484   unsigned long comm_size        = MPI_COMM_WORLD->size();
485   int send_size                  = parse_double(action[2]);
486   int recv_size                  = parse_double(action[3]);
487   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
488   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
489
490   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
491   void* recv = nullptr;
492   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
493   int rank = MPI_COMM_WORLD->rank();
494
495   if (rank == root)
496     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
497
498   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
499                                                                         Datatype::encode(MPI_CURRENT_TYPE),
500                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
501
502   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
503
504   TRACE_smpi_comm_out(Actor::self()->getPid());
505   log_timed_action(action, clock);
506 }
507
508 static void action_gatherv(simgrid::xbt::ReplayAction& action)
509 {
510   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
511        0 gather 68 68 10 10 10 0 0 0
512      where:
513        1) 68 is the sendcount
514        2) 68 10 10 10 is the recvcounts
515        3) 0 is the root node
516        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
517        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
518   */
519   double clock = smpi_process()->simulated_elapsed();
520   unsigned long comm_size = MPI_COMM_WORLD->size();
521   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
522   int send_size = parse_double(action[2]);
523   std::vector<int> disps(comm_size, 0);
524   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
525
526   MPI_Datatype MPI_CURRENT_TYPE =
527       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
528   MPI_Datatype MPI_CURRENT_TYPE2{
529       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
530
531   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
532   void *recv = nullptr;
533   for (unsigned int i = 0; i < comm_size; i++) {
534     (*recvcounts)[i] = std::stoi(action[i + 3]);
535   }
536   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
537
538   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
539   int rank = MPI_COMM_WORLD->rank();
540
541   if(rank==root)
542     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
543
544   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
545                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
546                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
547
548   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
549                  MPI_COMM_WORLD);
550
551   TRACE_smpi_comm_out(Actor::self()->getPid());
552   log_timed_action (action, clock);
553 }
554
555 static void action_scatterv(simgrid::xbt::ReplayAction& action)
556 {
557   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
558        0 gather 68 10 10 10 68 0 0 0
559      where:
560        1) 68 10 10 10 is the sendcounts
561        2) 68 is the recvcount
562        3) 0 is the root node
563        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
564        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
565   */
566   double clock  = smpi_process()->simulated_elapsed();
567   unsigned long comm_size = MPI_COMM_WORLD->size();
568   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
569   int recv_size = parse_double(action[2 + comm_size]);
570   std::vector<int> disps(comm_size, 0);
571   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
572
573   MPI_Datatype MPI_CURRENT_TYPE =
574       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
575   MPI_Datatype MPI_CURRENT_TYPE2{
576       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
577
578   void* send = nullptr;
579   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
580   for (unsigned int i = 0; i < comm_size; i++) {
581     (*sendcounts)[i] = std::stoi(action[i + 2]);
582   }
583   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
584
585   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
586   int rank = MPI_COMM_WORLD->rank();
587
588   if (rank == root)
589     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
590
591   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
592                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
593                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
594
595   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
596                   MPI_COMM_WORLD);
597
598   TRACE_smpi_comm_out(Actor::self()->getPid());
599   log_timed_action(action, clock);
600 }
601
602 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
603 {
604   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
605        0 reduceScatter 275427 275427 275427 204020 11346849 0
606      where:
607        1) The first four values after the name of the action declare the recvcounts array
608        2) The value 11346849 is the amount of instructions
609        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
610  */
611   double clock = smpi_process()->simulated_elapsed();
612   unsigned long comm_size = MPI_COMM_WORLD->size();
613   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
614   int comp_size = parse_double(action[2+comm_size]);
615   int my_proc_id                     = Actor::self()->getPid();
616   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
617   MPI_Datatype MPI_CURRENT_TYPE =
618       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
619
620   for (unsigned int i = 0; i < comm_size; i++) {
621     recvcounts->push_back(std::stoi(action[i + 2]));
622   }
623   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
624
625   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
626                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
627                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
628                                                        Datatype::encode(MPI_CURRENT_TYPE)));
629
630   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
631   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
632
633   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
634   smpi_execute_flops(comp_size);
635
636   TRACE_smpi_comm_out(my_proc_id);
637   log_timed_action (action, clock);
638 }
639
640 static void action_allgather(simgrid::xbt::ReplayAction& action)
641 {
642   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
643         0 allGather 275427 275427
644     where:
645         1) 275427 is the sendcount
646         2) 275427 is the recvcount
647         3) No more values mean that the datatype for sent and receive buffer is the default one, see
648     simgrid::smpi::Datatype::decode().
649   */
650   double clock = smpi_process()->simulated_elapsed();
651
652   CHECK_ACTION_PARAMS(action, 2, 2)
653   int sendcount = std::stoi(action[2]);
654   int recvcount = std::stoi(action[3]);
655
656   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
657   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
658
659   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
660   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
661
662   int my_proc_id = Actor::self()->getPid();
663
664   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
665                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
666                                                     Datatype::encode(MPI_CURRENT_TYPE),
667                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
668
669   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
670
671   TRACE_smpi_comm_out(my_proc_id);
672   log_timed_action (action, clock);
673 }
674
675 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
676 {
677   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
678         0 allGatherV 275427 275427 275427 275427 204020
679      where:
680         1) 275427 is the sendcount
681         2) The next four elements declare the recvcounts array
682         3) No more values mean that the datatype for sent and receive buffer is the default one, see
683      simgrid::smpi::Datatype::decode().
684   */
685   double clock = smpi_process()->simulated_elapsed();
686
687   unsigned long comm_size = MPI_COMM_WORLD->size();
688   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
689   int sendcount = std::stoi(action[2]);
690   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
691   std::vector<int> disps(comm_size, 0);
692
693   int datatype_index = 0, disp_index = 0;
694   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
695     datatype_index = 3 + comm_size;
696     disp_index     = datatype_index + 1;
697   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
698     datatype_index = -1;
699     disp_index     = 3 + comm_size;
700   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
701     datatype_index = 3 + comm_size;
702   }
703
704   if (disp_index != 0) {
705     for (unsigned int i = 0; i < comm_size; i++)
706       disps[i]          = std::stoi(action[disp_index + i]);
707   }
708
709   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
710                                                      : MPI_DEFAULT_TYPE};
711   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
712                                                       : MPI_DEFAULT_TYPE};
713
714   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
715
716   for (unsigned int i = 0; i < comm_size; i++) {
717     (*recvcounts)[i] = std::stoi(action[i + 3]);
718   }
719   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
720   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
721
722   int my_proc_id = Actor::self()->getPid();
723
724   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
725                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
726                                                        Datatype::encode(MPI_CURRENT_TYPE),
727                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
728
729   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
730                     MPI_COMM_WORLD);
731
732   TRACE_smpi_comm_out(my_proc_id);
733   log_timed_action (action, clock);
734 }
735
736 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
737 {
738   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
739         0 allToAllV 100 1 7 10 12 100 1 70 10 5
740      where:
741         1) 100 is the size of the send buffer *sizeof(int),
742         2) 1 7 10 12 is the sendcounts array
743         3) 100*sizeof(int) is the size of the receiver buffer
744         4)  1 70 10 5 is the recvcounts array
745   */
746   double clock = smpi_process()->simulated_elapsed();
747
748   unsigned long comm_size = MPI_COMM_WORLD->size();
749   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
750   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
751   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
752   std::vector<int> senddisps(comm_size, 0);
753   std::vector<int> recvdisps(comm_size, 0);
754
755   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
756                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
757                                       : MPI_DEFAULT_TYPE;
758   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
759                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
760                                      : MPI_DEFAULT_TYPE};
761
762   int send_buf_size=parse_double(action[2]);
763   int recv_buf_size=parse_double(action[3+comm_size]);
764   int my_proc_id = Actor::self()->getPid();
765   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
767
768   for (unsigned int i = 0; i < comm_size; i++) {
769     (*sendcounts)[i] = std::stoi(action[3 + i]);
770     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
771   }
772   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774
775   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777                                                        Datatype::encode(MPI_CURRENT_TYPE),
778                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
779
780   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
781                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
782
783   TRACE_smpi_comm_out(my_proc_id);
784   log_timed_action (action, clock);
785 }
786
787 }} // namespace simgrid::smpi
788
789 /** @brief Only initialize the replay, don't do it for real */
790 void smpi_replay_init(int* argc, char*** argv)
791 {
792   simgrid::smpi::Process::init(argc, argv);
793   smpi_process()->mark_as_initialized();
794   smpi_process()->set_replaying(true);
795
796   int my_proc_id = Actor::self()->getPid();
797   TRACE_smpi_init(my_proc_id);
798   TRACE_smpi_computing_init(my_proc_id);
799   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
800   TRACE_smpi_comm_out(my_proc_id);
801   xbt_replay_action_register("init",       simgrid::smpi::action_init);
802   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
803   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
804   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
805   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
806
807   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
808   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
809   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
810   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
811   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
812   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
813   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
814   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
815   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
816   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
817   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
818   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
819   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
820   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
821   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
822   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
823   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
824   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
825   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
826   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
827   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
828
829   //if we have a delayed start, sleep here.
830   if(*argc>2){
831     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
832     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
833     smpi_execute_flops(value);
834   } else {
835     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
836     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
837     smpi_execute_flops(0.0);
838   }
839 }
840
841 /** @brief actually run the replay after initialization */
842 void smpi_replay_main(int* argc, char*** argv)
843 {
844   simgrid::xbt::replay_runner(*argc, *argv);
845
846   /* and now, finalize everything */
847   /* One active process will stop. Decrease the counter*/
848   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
849   if (not get_reqq_self()->empty()) {
850     unsigned int count_requests=get_reqq_self()->size();
851     MPI_Request requests[count_requests];
852     MPI_Status status[count_requests];
853     unsigned int i=0;
854
855     for (auto const& req : *get_reqq_self()) {
856       requests[i] = req;
857       i++;
858     }
859     simgrid::smpi::Request::waitall(count_requests, requests, status);
860   }
861   delete get_reqq_self();
862   active_processes--;
863
864   if(active_processes==0){
865     /* Last process alive speaking: end the simulated timer */
866     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
867     smpi_free_replay_tmp_buffers();
868   }
869
870   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
871
872   smpi_process()->finalize();
873
874   TRACE_smpi_comm_out(Actor::self()->getPid());
875   TRACE_smpi_finalize(Actor::self()->getPid());
876 }
877
878 /** @brief chain a replay initialization and a replay start */
879 void smpi_replay_run(int* argc, char*** argv)
880 {
881   smpi_replay_init(argc, argv);
882   smpi_replay_main(argc, argv);
883 }