Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: C++-ify (all)gather action
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 class GatherParser : public CollCommParser {
169 public:
170   void parse(simgrid::xbt::ReplayAction& action) override
171   {
172     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
173           0 gather 68 68 0 0 0
174         where:
175           1) 68 is the sendcounts
176           2) 68 is the recvcounts
177           3) 0 is the root node
178           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
180     */
181     CHECK_ACTION_PARAMS(action, 2, 3)
182     comm_size = MPI_COMM_WORLD->size();
183     send_size = parse_double(action[2]);
184     recv_size = parse_double(action[3]);
185     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
186
187     if (action.size() > 5)
188       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189     if (action.size() > 6)
190       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
191   }
192 };
193
194 template <class T> class ReplayAction {
195 protected:
196   const std::string name;
197   T args;
198
199   int my_proc_id;
200
201 public:
202   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
203
204   virtual void execute(simgrid::xbt::ReplayAction& action)
205   {
206     // Needs to be re-initialized for every action, hence here
207     double start_time = smpi_process()->simulated_elapsed();
208     args.parse(action);
209     kernel(action);
210     if (name != "Init")
211       log_timed_action(action, start_time);
212   }
213
214   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
215
216   void* send_buffer(int size)
217   {
218     return smpi_get_tmp_sendbuffer(size);
219   }
220
221   void* recv_buffer(int size)
222   {
223     return smpi_get_tmp_recvbuffer(size);
224   }
225 };
226
227 class WaitAction : public ReplayAction<ActionArgParser> {
228 public:
229   WaitAction() : ReplayAction("Wait") {}
230   void kernel(simgrid::xbt::ReplayAction& action) override
231   {
232     std::string s = boost::algorithm::join(action, " ");
233     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
234     MPI_Request request = get_reqq_self()->back();
235     get_reqq_self()->pop_back();
236
237     if (request == nullptr) {
238       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
239        * return.*/
240       return;
241     }
242
243     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
244
245     // Must be taken before Request::wait() since the request may be set to
246     // MPI_REQUEST_NULL by Request::wait!
247     int src                  = request->comm()->group()->rank(request->src());
248     int dst                  = request->comm()->group()->rank(request->dst());
249     bool is_wait_for_receive = (request->flags() & RECV);
250     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
251     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
252
253     MPI_Status status;
254     Request::wait(&request, &status);
255
256     TRACE_smpi_comm_out(rank);
257     if (is_wait_for_receive)
258       TRACE_smpi_recv(src, dst, 0);
259   }
260 };
261
262 class SendAction : public ReplayAction<SendRecvParser> {
263 public:
264   SendAction() = delete;
265   SendAction(std::string name) : ReplayAction(name) {}
266   void kernel(simgrid::xbt::ReplayAction& action) override
267   {
268     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
269
270     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
271                                                                                  Datatype::encode(args.datatype1)));
272     if (not TRACE_smpi_view_internals())
273       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
274
275     if (name == "send") {
276       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
277     } else if (name == "Isend") {
278       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
279       get_reqq_self()->push_back(request);
280     } else {
281       xbt_die("Don't know this action, %s", name.c_str());
282     }
283
284     TRACE_smpi_comm_out(my_proc_id);
285   }
286 };
287
288 class RecvAction : public ReplayAction<SendRecvParser> {
289 public:
290   RecvAction() = delete;
291   explicit RecvAction(std::string name) : ReplayAction(name) {}
292   void kernel(simgrid::xbt::ReplayAction& action) override
293   {
294     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
295
296     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
297                                                                                  Datatype::encode(args.datatype1)));
298
299     MPI_Status status;
300     // unknown size from the receiver point of view
301     if (args.size <= 0.0) {
302       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
303       args.size = status.count;
304     }
305
306     if (name == "recv") {
307       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
308     } else if (name == "Irecv") {
309       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
310       get_reqq_self()->push_back(request);
311     }
312
313     TRACE_smpi_comm_out(my_proc_id);
314     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
315     if (name == "recv" && not TRACE_smpi_view_internals()) {
316       TRACE_smpi_recv(src_traced, my_proc_id, 0);
317     }
318   }
319 };
320
321 class ComputeAction : public ReplayAction<ComputeParser> {
322 public:
323   ComputeAction() : ReplayAction("compute") {}
324   void kernel(simgrid::xbt::ReplayAction& action) override
325   {
326     TRACE_smpi_computing_in(my_proc_id, args.flops);
327     smpi_execute_flops(args.flops);
328     TRACE_smpi_computing_out(my_proc_id);
329   }
330 };
331
332 class TestAction : public ReplayAction<ActionArgParser> {
333 public:
334   TestAction() : ReplayAction("Test") {}
335   void kernel(simgrid::xbt::ReplayAction& action) override
336   {
337     MPI_Request request = get_reqq_self()->back();
338     get_reqq_self()->pop_back();
339     // if request is null here, this may mean that a previous test has succeeded
340     // Different times in traced application and replayed version may lead to this
341     // In this case, ignore the extra calls.
342     if (request != nullptr) {
343       TRACE_smpi_testing_in(my_proc_id);
344
345       MPI_Status status;
346       int flag = Request::test(&request, &status);
347
348       XBT_DEBUG("MPI_Test result: %d", flag);
349       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
350        * nullptr.*/
351       get_reqq_self()->push_back(request);
352
353       TRACE_smpi_testing_out(my_proc_id);
354     }
355   }
356 };
357
358 class InitAction : public ReplayAction<ActionArgParser> {
359 public:
360   InitAction() : ReplayAction("Init") {}
361   void kernel(simgrid::xbt::ReplayAction& action) override
362   {
363     CHECK_ACTION_PARAMS(action, 0, 1)
364     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
365                                            : MPI_BYTE;  // default TAU datatype
366
367     /* start a simulated timer */
368     smpi_process()->simulated_start();
369     /*initialize the number of active processes */
370     active_processes = smpi_process_count();
371
372     set_reqq_self(new std::vector<MPI_Request>);
373   }
374 };
375
376 class CommunicatorAction : public ReplayAction<ActionArgParser> {
377 public:
378   CommunicatorAction() : ReplayAction("Comm") {}
379   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
380 };
381
382 class WaitAllAction : public ReplayAction<ActionArgParser> {
383 public:
384   WaitAllAction() : ReplayAction("waitAll") {}
385   void kernel(simgrid::xbt::ReplayAction& action) override
386   {
387     const unsigned int count_requests = get_reqq_self()->size();
388
389     if (count_requests > 0) {
390       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
391                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
392       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
393       for (const auto& req : (*get_reqq_self())) {
394         if (req && (req->flags() & RECV)) {
395           sender_receiver.push_back({req->src(), req->dst()});
396         }
397       }
398       MPI_Status status[count_requests];
399       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
400
401       for (auto& pair : sender_receiver) {
402         TRACE_smpi_recv(pair.first, pair.second, 0);
403       }
404       TRACE_smpi_comm_out(my_proc_id);
405     }
406   }
407 };
408
409 class BarrierAction : public ReplayAction<ActionArgParser> {
410 public:
411   BarrierAction() : ReplayAction("barrier") {}
412   void kernel(simgrid::xbt::ReplayAction& action) override
413   {
414     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
415     Colls::barrier(MPI_COMM_WORLD);
416     TRACE_smpi_comm_out(my_proc_id);
417   }
418 };
419
420 class BcastAction : public ReplayAction<BcastArgParser> {
421 public:
422   BcastAction() : ReplayAction("bcast") {}
423   void kernel(simgrid::xbt::ReplayAction& action) override
424   {
425     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
426                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
427                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
428
429     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
430
431     TRACE_smpi_comm_out(my_proc_id);
432   }
433 };
434
435 class ReduceAction : public ReplayAction<ReduceArgParser> {
436 public:
437   ReduceAction() : ReplayAction("reduce") {}
438   void kernel(simgrid::xbt::ReplayAction& action) override
439   {
440     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
441                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
442                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
443
444     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
445         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
446     smpi_execute_flops(args.comp_size);
447
448     TRACE_smpi_comm_out(my_proc_id);
449   }
450 };
451
452 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
453 public:
454   AllReduceAction() : ReplayAction("allReduce") {}
455   void kernel(simgrid::xbt::ReplayAction& action) override
456   {
457     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
458                                                                                 Datatype::encode(args.datatype1), ""));
459
460     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
461         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
462     smpi_execute_flops(args.comp_size);
463
464     TRACE_smpi_comm_out(my_proc_id);
465   }
466 };
467
468 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
469 public:
470   AllToAllAction() : ReplayAction("allToAll") {}
471   void kernel(simgrid::xbt::ReplayAction& action) override
472   {
473     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
474                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
475                                                     Datatype::encode(args.datatype1),
476                                                     Datatype::encode(args.datatype2)));
477
478     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
479       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
480       args.recv_size, args.datatype2, MPI_COMM_WORLD);
481
482     TRACE_smpi_comm_out(my_proc_id);
483   }
484 };
485
486 class GatherAction : public ReplayAction<GatherArgParser> {
487 public:
488   GatherAction(std::string name) : ReplayAction(name) {}
489   void kernel(simgrid::xbt::ReplayAction& action) override
490   {
491     TRACE_smpi_comm_in(my_proc_id, name, new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
492                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
493
494     if (name == "gather")
495       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
496                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
497     else
498       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
499                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
500
501     TRACE_smpi_comm_out(my_proc_id);
502   }
503 };
504 } // Replay Namespace
505
506 static void action_scatter(simgrid::xbt::ReplayAction& action)
507 {
508   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
509         0 gather 68 68 0 0 0
510       where:
511         1) 68 is the sendcounts
512         2) 68 is the recvcounts
513         3) 0 is the root node
514         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
515         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
516   */
517   CHECK_ACTION_PARAMS(action, 2, 3)
518   double clock                   = smpi_process()->simulated_elapsed();
519   unsigned long comm_size        = MPI_COMM_WORLD->size();
520   int send_size                  = parse_double(action[2]);
521   int recv_size                  = parse_double(action[3]);
522   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
523   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
524
525   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
526   void* recv = nullptr;
527   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
528   int rank = MPI_COMM_WORLD->rank();
529
530   if (rank == root)
531     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
532
533   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
534                                                                         Datatype::encode(MPI_CURRENT_TYPE),
535                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
536
537   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
538
539   TRACE_smpi_comm_out(Actor::self()->getPid());
540   log_timed_action(action, clock);
541 }
542
543 static void action_gatherv(simgrid::xbt::ReplayAction& action)
544 {
545   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
546        0 gather 68 68 10 10 10 0 0 0
547      where:
548        1) 68 is the sendcount
549        2) 68 10 10 10 is the recvcounts
550        3) 0 is the root node
551        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
552        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
553   */
554   double clock = smpi_process()->simulated_elapsed();
555   unsigned long comm_size = MPI_COMM_WORLD->size();
556   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
557   int send_size = parse_double(action[2]);
558   std::vector<int> disps(comm_size, 0);
559   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
560
561   MPI_Datatype MPI_CURRENT_TYPE =
562       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
563   MPI_Datatype MPI_CURRENT_TYPE2{
564       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
565
566   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
567   void *recv = nullptr;
568   for (unsigned int i = 0; i < comm_size; i++) {
569     (*recvcounts)[i] = std::stoi(action[i + 3]);
570   }
571   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
572
573   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
574   int rank = MPI_COMM_WORLD->rank();
575
576   if(rank==root)
577     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
578
579   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
580                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
581                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
582
583   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
584                  MPI_COMM_WORLD);
585
586   TRACE_smpi_comm_out(Actor::self()->getPid());
587   log_timed_action (action, clock);
588 }
589
590 static void action_scatterv(simgrid::xbt::ReplayAction& action)
591 {
592   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
593        0 gather 68 10 10 10 68 0 0 0
594      where:
595        1) 68 10 10 10 is the sendcounts
596        2) 68 is the recvcount
597        3) 0 is the root node
598        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
599        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
600   */
601   double clock  = smpi_process()->simulated_elapsed();
602   unsigned long comm_size = MPI_COMM_WORLD->size();
603   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
604   int recv_size = parse_double(action[2 + comm_size]);
605   std::vector<int> disps(comm_size, 0);
606   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
607
608   MPI_Datatype MPI_CURRENT_TYPE =
609       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
610   MPI_Datatype MPI_CURRENT_TYPE2{
611       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
612
613   void* send = nullptr;
614   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
615   for (unsigned int i = 0; i < comm_size; i++) {
616     (*sendcounts)[i] = std::stoi(action[i + 2]);
617   }
618   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
619
620   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
621   int rank = MPI_COMM_WORLD->rank();
622
623   if (rank == root)
624     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
625
626   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
627                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
628                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
629
630   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
631                   MPI_COMM_WORLD);
632
633   TRACE_smpi_comm_out(Actor::self()->getPid());
634   log_timed_action(action, clock);
635 }
636
637 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
638 {
639   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
640        0 reduceScatter 275427 275427 275427 204020 11346849 0
641      where:
642        1) The first four values after the name of the action declare the recvcounts array
643        2) The value 11346849 is the amount of instructions
644        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
645  */
646   double clock = smpi_process()->simulated_elapsed();
647   unsigned long comm_size = MPI_COMM_WORLD->size();
648   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
649   int comp_size = parse_double(action[2+comm_size]);
650   int my_proc_id                     = Actor::self()->getPid();
651   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
652   MPI_Datatype MPI_CURRENT_TYPE =
653       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
654
655   for (unsigned int i = 0; i < comm_size; i++) {
656     recvcounts->push_back(std::stoi(action[i + 2]));
657   }
658   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
659
660   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
661                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
662                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
663                                                        Datatype::encode(MPI_CURRENT_TYPE)));
664
665   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
666   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
667
668   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
669   smpi_execute_flops(comp_size);
670
671   TRACE_smpi_comm_out(my_proc_id);
672   log_timed_action (action, clock);
673 }
674
675 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
676 {
677   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
678         0 allGatherV 275427 275427 275427 275427 204020
679      where:
680         1) 275427 is the sendcount
681         2) The next four elements declare the recvcounts array
682         3) No more values mean that the datatype for sent and receive buffer is the default one, see
683      simgrid::smpi::Datatype::decode().
684   */
685   double clock = smpi_process()->simulated_elapsed();
686
687   unsigned long comm_size = MPI_COMM_WORLD->size();
688   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
689   int sendcount = std::stoi(action[2]);
690   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
691   std::vector<int> disps(comm_size, 0);
692
693   int datatype_index = 0, disp_index = 0;
694   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
695     datatype_index = 3 + comm_size;
696     disp_index     = datatype_index + 1;
697   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
698     datatype_index = -1;
699     disp_index     = 3 + comm_size;
700   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
701     datatype_index = 3 + comm_size;
702   }
703
704   if (disp_index != 0) {
705     for (unsigned int i = 0; i < comm_size; i++)
706       disps[i]          = std::stoi(action[disp_index + i]);
707   }
708
709   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
710                                                      : MPI_DEFAULT_TYPE};
711   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
712                                                       : MPI_DEFAULT_TYPE};
713
714   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
715
716   for (unsigned int i = 0; i < comm_size; i++) {
717     (*recvcounts)[i] = std::stoi(action[i + 3]);
718   }
719   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
720   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
721
722   int my_proc_id = Actor::self()->getPid();
723
724   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
725                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
726                                                        Datatype::encode(MPI_CURRENT_TYPE),
727                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
728
729   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
730                     MPI_COMM_WORLD);
731
732   TRACE_smpi_comm_out(my_proc_id);
733   log_timed_action (action, clock);
734 }
735
736 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
737 {
738   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
739         0 allToAllV 100 1 7 10 12 100 1 70 10 5
740      where:
741         1) 100 is the size of the send buffer *sizeof(int),
742         2) 1 7 10 12 is the sendcounts array
743         3) 100*sizeof(int) is the size of the receiver buffer
744         4)  1 70 10 5 is the recvcounts array
745   */
746   double clock = smpi_process()->simulated_elapsed();
747
748   unsigned long comm_size = MPI_COMM_WORLD->size();
749   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
750   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
751   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
752   std::vector<int> senddisps(comm_size, 0);
753   std::vector<int> recvdisps(comm_size, 0);
754
755   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
756                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
757                                       : MPI_DEFAULT_TYPE;
758   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
759                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
760                                      : MPI_DEFAULT_TYPE};
761
762   int send_buf_size=parse_double(action[2]);
763   int recv_buf_size=parse_double(action[3+comm_size]);
764   int my_proc_id = Actor::self()->getPid();
765   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
767
768   for (unsigned int i = 0; i < comm_size; i++) {
769     (*sendcounts)[i] = std::stoi(action[3 + i]);
770     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
771   }
772   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774
775   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777                                                        Datatype::encode(MPI_CURRENT_TYPE),
778                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
779
780   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
781                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
782
783   TRACE_smpi_comm_out(my_proc_id);
784   log_timed_action (action, clock);
785 }
786
787 }} // namespace simgrid::smpi
788
789 /** @brief Only initialize the replay, don't do it for real */
790 void smpi_replay_init(int* argc, char*** argv)
791 {
792   simgrid::smpi::Process::init(argc, argv);
793   smpi_process()->mark_as_initialized();
794   smpi_process()->set_replaying(true);
795
796   int my_proc_id = Actor::self()->getPid();
797   TRACE_smpi_init(my_proc_id);
798   TRACE_smpi_computing_init(my_proc_id);
799   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
800   TRACE_smpi_comm_out(my_proc_id);
801   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
802   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
803   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
804   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
805   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
806
807   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
808   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
809   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
810   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
811   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
812   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
813   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
814   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
815   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
816   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
817   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
818   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
819   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
820   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
821   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
822   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
823   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
824   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
825   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
826   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
827   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
828
829   //if we have a delayed start, sleep here.
830   if(*argc>2){
831     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
832     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
833     smpi_execute_flops(value);
834   } else {
835     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
836     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
837     smpi_execute_flops(0.0);
838   }
839 }
840
841 /** @brief actually run the replay after initialization */
842 void smpi_replay_main(int* argc, char*** argv)
843 {
844   simgrid::xbt::replay_runner(*argc, *argv);
845
846   /* and now, finalize everything */
847   /* One active process will stop. Decrease the counter*/
848   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
849   if (not get_reqq_self()->empty()) {
850     unsigned int count_requests=get_reqq_self()->size();
851     MPI_Request requests[count_requests];
852     MPI_Status status[count_requests];
853     unsigned int i=0;
854
855     for (auto const& req : *get_reqq_self()) {
856       requests[i] = req;
857       i++;
858     }
859     simgrid::smpi::Request::waitall(count_requests, requests, status);
860   }
861   delete get_reqq_self();
862   active_processes--;
863
864   if(active_processes==0){
865     /* Last process alive speaking: end the simulated timer */
866     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
867     smpi_free_replay_tmp_buffers();
868   }
869
870   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
871
872   smpi_process()->finalize();
873
874   TRACE_smpi_comm_out(Actor::self()->getPid());
875   TRACE_smpi_finalize(Actor::self()->getPid());
876 }
877
878 /** @brief chain a replay initialization and a replay start */
879 void smpi_replay_run(int* argc, char*** argv)
880 {
881   smpi_replay_init(argc, argv);
882   smpi_replay_main(argc, argv);
883 }