Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Cosmetics
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   /* communication partner; if we send, this is the receiver and vice versa */
78   int partner;
79   double size;
80   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81
82   void parse(simgrid::xbt::ReplayAction& action) override
83   {
84     CHECK_ACTION_PARAMS(action, 2, 1)
85     partner = std::stoi(action[2]);
86     size    = parse_double(action[3]);
87     if (action.size() > 4)
88       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
89   }
90 };
91
92 class ComputeParser : public ActionArgParser {
93 public:
94   /* communication partner; if we send, this is the receiver and vice versa */
95   double flops;
96
97   void parse(simgrid::xbt::ReplayAction& action) override
98   {
99     CHECK_ACTION_PARAMS(action, 1, 0)
100     flops = parse_double(action[2]);
101   }
102 };
103
104 template <class T> class ReplayAction {
105 protected:
106   const std::string name;
107   T args;
108
109   int my_proc_id;
110
111 public:
112   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113
114   virtual void execute(simgrid::xbt::ReplayAction& action)
115   {
116     // Needs to be re-initialized for every action, hence here
117     double start_time = smpi_process()->simulated_elapsed();
118     args.parse(action);
119     kernel(action);
120     log_timed_action(action, start_time);
121   }
122
123   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
124 };
125
126 class WaitAction : public ReplayAction<ActionArgParser> {
127 public:
128   WaitAction() : ReplayAction("Wait") {}
129   void kernel(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 0, 0)
132     MPI_Status status;
133
134     std::string s = boost::algorithm::join(action, " ");
135     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136     MPI_Request request = get_reqq_self()->back();
137     get_reqq_self()->pop_back();
138
139     if (request == nullptr) {
140       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
141        * return.*/
142       return;
143     }
144
145     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146
147     MPI_Group group          = request->comm()->group();
148     int src_traced           = group->rank(request->src());
149     int dst_traced           = group->rank(request->dst());
150     bool is_wait_for_receive = (request->flags() & RECV);
151     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
152     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
153
154     Request::wait(&request, &status);
155
156     TRACE_smpi_comm_out(rank);
157     if (is_wait_for_receive)
158       TRACE_smpi_recv(src_traced, dst_traced, 0);
159   }
160 };
161
162 class SendAction : public ReplayAction<SendRecvParser> {
163 public:
164   SendAction() = delete;
165   SendAction(std::string name) : ReplayAction(name) {}
166   void kernel(simgrid::xbt::ReplayAction& action) override
167   {
168     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
169
170     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
171                                                                                  Datatype::encode(args.datatype1)));
172     if (not TRACE_smpi_view_internals())
173       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
174
175     if (name == "send") {
176       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
177     } else if (name == "Isend") {
178       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
179       get_reqq_self()->push_back(request);
180     } else {
181       xbt_die("Don't know this action, %s", name.c_str());
182     }
183
184     TRACE_smpi_comm_out(my_proc_id);
185   }
186 };
187
188 class RecvAction : public ReplayAction<SendRecvParser> {
189 public:
190   RecvAction() = delete;
191   explicit RecvAction(std::string name) : ReplayAction(name) {}
192   void kernel(simgrid::xbt::ReplayAction& action) override
193   {
194     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
195
196     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
197                                                                                  Datatype::encode(args.datatype1)));
198
199     MPI_Status status;
200     // unknown size from the receiver point of view
201     if (args.size <= 0.0) {
202       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
203       args.size = status.count;
204     }
205
206     if (name == "recv") {
207       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
208     } else if (name == "Irecv") {
209       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
210       get_reqq_self()->push_back(request);
211     }
212
213     TRACE_smpi_comm_out(my_proc_id);
214     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
215     if (name == "recv" && not TRACE_smpi_view_internals()) {
216       TRACE_smpi_recv(src_traced, my_proc_id, 0);
217     }
218   }
219 };
220
221 class ComputeAction : public ReplayAction<ComputeParser> {
222 public:
223   ComputeAction() : ReplayAction("compute") {}
224   void kernel(simgrid::xbt::ReplayAction& action) override
225   {
226     TRACE_smpi_computing_in(my_proc_id, args.flops);
227     smpi_execute_flops(args.flops);
228     TRACE_smpi_computing_out(my_proc_id);
229   }
230 };
231
232 } // Replay Namespace
233
234 static void action_init(simgrid::xbt::ReplayAction& action)
235 {
236   XBT_DEBUG("Initialize the counters");
237   CHECK_ACTION_PARAMS(action, 0, 1)
238   if (action.size() > 2)
239     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
240   else
241     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
242
243   /* start a simulated timer */
244   smpi_process()->simulated_start();
245   /*initialize the number of active processes */
246   active_processes = smpi_process_count();
247
248   set_reqq_self(new std::vector<MPI_Request>);
249 }
250
251 static void action_finalize(simgrid::xbt::ReplayAction& action)
252 {
253   /* Nothing to do */
254 }
255
256 static void action_comm_size(simgrid::xbt::ReplayAction& action)
257 {
258   communicator_size = parse_double(action[2]);
259   log_timed_action (action, smpi_process()->simulated_elapsed());
260 }
261
262 static void action_comm_split(simgrid::xbt::ReplayAction& action)
263 {
264   log_timed_action (action, smpi_process()->simulated_elapsed());
265 }
266
267 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
268 {
269   log_timed_action (action, smpi_process()->simulated_elapsed());
270 }
271
272 static void action_compute(simgrid::xbt::ReplayAction& action)
273 {
274   Replay::ComputeAction().execute(action);
275 }
276
277 static void action_test(simgrid::xbt::ReplayAction& action)
278 {
279   CHECK_ACTION_PARAMS(action, 0, 0)
280   double clock = smpi_process()->simulated_elapsed();
281   MPI_Status status;
282
283   MPI_Request request = get_reqq_self()->back();
284   get_reqq_self()->pop_back();
285   //if request is null here, this may mean that a previous test has succeeded
286   //Different times in traced application and replayed version may lead to this
287   //In this case, ignore the extra calls.
288   if(request!=nullptr){
289     int my_proc_id = Actor::self()->getPid();
290     TRACE_smpi_testing_in(my_proc_id);
291
292     int flag = Request::test(&request, &status);
293
294     XBT_DEBUG("MPI_Test result: %d", flag);
295     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
296     get_reqq_self()->push_back(request);
297
298     TRACE_smpi_testing_out(my_proc_id);
299   }
300   log_timed_action (action, clock);
301 }
302
303 static void action_waitall(simgrid::xbt::ReplayAction& action)
304 {
305   CHECK_ACTION_PARAMS(action, 0, 0)
306   double clock = smpi_process()->simulated_elapsed();
307   const unsigned int count_requests = get_reqq_self()->size();
308
309   if (count_requests>0) {
310     MPI_Status status[count_requests];
311
312     int my_proc_id_traced = Actor::self()->getPid();
313     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
314                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
315     int recvs_snd[count_requests];
316     int recvs_rcv[count_requests];
317     for (unsigned int i = 0; i < count_requests; i++) {
318       const auto& req = (*get_reqq_self())[i];
319       if (req && (req->flags() & RECV)) {
320         recvs_snd[i] = req->src();
321         recvs_rcv[i] = req->dst();
322       } else
323         recvs_snd[i] = -100;
324    }
325    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
326
327    for (unsigned i = 0; i < count_requests; i++) {
328      if (recvs_snd[i]!=-100)
329        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
330    }
331    TRACE_smpi_comm_out(my_proc_id_traced);
332   }
333   log_timed_action (action, clock);
334 }
335
336 static void action_barrier(simgrid::xbt::ReplayAction& action)
337 {
338   double clock = smpi_process()->simulated_elapsed();
339   int my_proc_id = Actor::self()->getPid();
340   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
341
342   Colls::barrier(MPI_COMM_WORLD);
343
344   TRACE_smpi_comm_out(my_proc_id);
345   log_timed_action (action, clock);
346 }
347
348 static void action_bcast(simgrid::xbt::ReplayAction& action)
349 {
350   CHECK_ACTION_PARAMS(action, 1, 2)
351   double size = parse_double(action[2]);
352   double clock = smpi_process()->simulated_elapsed();
353   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
354   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
355   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
356
357   int my_proc_id = Actor::self()->getPid();
358   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
359                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
360                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
361
362   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
363
364   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
365
366   TRACE_smpi_comm_out(my_proc_id);
367   log_timed_action (action, clock);
368 }
369
370 static void action_reduce(simgrid::xbt::ReplayAction& action)
371 {
372   CHECK_ACTION_PARAMS(action, 2, 2)
373   double comm_size = parse_double(action[2]);
374   double comp_size = parse_double(action[3]);
375   double clock = smpi_process()->simulated_elapsed();
376   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
377
378   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
379
380   int my_proc_id = Actor::self()->getPid();
381   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
382                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
383                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
384
385   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
386   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
387   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
388   smpi_execute_flops(comp_size);
389
390   TRACE_smpi_comm_out(my_proc_id);
391   log_timed_action (action, clock);
392 }
393
394 static void action_allReduce(simgrid::xbt::ReplayAction& action)
395 {
396   CHECK_ACTION_PARAMS(action, 2, 1)
397   double comm_size = parse_double(action[2]);
398   double comp_size = parse_double(action[3]);
399
400   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
401
402   double clock = smpi_process()->simulated_elapsed();
403   int my_proc_id = Actor::self()->getPid();
404   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
405                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
406
407   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
408   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
409   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
410   smpi_execute_flops(comp_size);
411
412   TRACE_smpi_comm_out(my_proc_id);
413   log_timed_action (action, clock);
414 }
415
416 static void action_allToAll(simgrid::xbt::ReplayAction& action)
417 {
418   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
419   double clock = smpi_process()->simulated_elapsed();
420   unsigned long comm_size = MPI_COMM_WORLD->size();
421   int send_size = parse_double(action[2]);
422   int recv_size = parse_double(action[3]);
423   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
424   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
425
426   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
427   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
428
429   int my_proc_id = Actor::self()->getPid();
430   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
431                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
432                                                     Datatype::encode(MPI_CURRENT_TYPE),
433                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
434
435   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
436
437   TRACE_smpi_comm_out(my_proc_id);
438   log_timed_action (action, clock);
439 }
440
441 static void action_gather(simgrid::xbt::ReplayAction& action)
442 {
443   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
444         0 gather 68 68 0 0 0
445       where:
446         1) 68 is the sendcounts
447         2) 68 is the recvcounts
448         3) 0 is the root node
449         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
450         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
451   */
452   CHECK_ACTION_PARAMS(action, 2, 3)
453   double clock = smpi_process()->simulated_elapsed();
454   unsigned long comm_size = MPI_COMM_WORLD->size();
455   int send_size = parse_double(action[2]);
456   int recv_size = parse_double(action[3]);
457   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
458   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
459
460   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
461   void *recv = nullptr;
462   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
463   int rank = MPI_COMM_WORLD->rank();
464
465   if(rank==root)
466     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
467
468   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
469                                                                         Datatype::encode(MPI_CURRENT_TYPE),
470                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
471
472   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
473
474   TRACE_smpi_comm_out(Actor::self()->getPid());
475   log_timed_action (action, clock);
476 }
477
478 static void action_scatter(simgrid::xbt::ReplayAction& action)
479 {
480   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
481         0 gather 68 68 0 0 0
482       where:
483         1) 68 is the sendcounts
484         2) 68 is the recvcounts
485         3) 0 is the root node
486         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
487         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
488   */
489   CHECK_ACTION_PARAMS(action, 2, 3)
490   double clock                   = smpi_process()->simulated_elapsed();
491   unsigned long comm_size        = MPI_COMM_WORLD->size();
492   int send_size                  = parse_double(action[2]);
493   int recv_size                  = parse_double(action[3]);
494   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
495   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
496
497   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
498   void* recv = nullptr;
499   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
500   int rank = MPI_COMM_WORLD->rank();
501
502   if (rank == root)
503     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
504
505   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
506                                                                         Datatype::encode(MPI_CURRENT_TYPE),
507                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
508
509   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
510
511   TRACE_smpi_comm_out(Actor::self()->getPid());
512   log_timed_action(action, clock);
513 }
514
515 static void action_gatherv(simgrid::xbt::ReplayAction& action)
516 {
517   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
518        0 gather 68 68 10 10 10 0 0 0
519      where:
520        1) 68 is the sendcount
521        2) 68 10 10 10 is the recvcounts
522        3) 0 is the root node
523        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
524        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
525   */
526   double clock = smpi_process()->simulated_elapsed();
527   unsigned long comm_size = MPI_COMM_WORLD->size();
528   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
529   int send_size = parse_double(action[2]);
530   std::vector<int> disps(comm_size, 0);
531   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
532
533   MPI_Datatype MPI_CURRENT_TYPE =
534       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
535   MPI_Datatype MPI_CURRENT_TYPE2{
536       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
537
538   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
539   void *recv = nullptr;
540   for (unsigned int i = 0; i < comm_size; i++) {
541     (*recvcounts)[i] = std::stoi(action[i + 3]);
542   }
543   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
544
545   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
546   int rank = MPI_COMM_WORLD->rank();
547
548   if(rank==root)
549     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
550
551   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
552                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
553                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
554
555   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
556                  MPI_COMM_WORLD);
557
558   TRACE_smpi_comm_out(Actor::self()->getPid());
559   log_timed_action (action, clock);
560 }
561
562 static void action_scatterv(simgrid::xbt::ReplayAction& action)
563 {
564   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
565        0 gather 68 10 10 10 68 0 0 0
566      where:
567        1) 68 10 10 10 is the sendcounts
568        2) 68 is the recvcount
569        3) 0 is the root node
570        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
571        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
572   */
573   double clock  = smpi_process()->simulated_elapsed();
574   unsigned long comm_size = MPI_COMM_WORLD->size();
575   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
576   int recv_size = parse_double(action[2 + comm_size]);
577   std::vector<int> disps(comm_size, 0);
578   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
579
580   MPI_Datatype MPI_CURRENT_TYPE =
581       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
582   MPI_Datatype MPI_CURRENT_TYPE2{
583       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
584
585   void* send = nullptr;
586   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
587   for (unsigned int i = 0; i < comm_size; i++) {
588     (*sendcounts)[i] = std::stoi(action[i + 2]);
589   }
590   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
591
592   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
593   int rank = MPI_COMM_WORLD->rank();
594
595   if (rank == root)
596     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
597
598   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
599                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
600                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
601
602   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
603                   MPI_COMM_WORLD);
604
605   TRACE_smpi_comm_out(Actor::self()->getPid());
606   log_timed_action(action, clock);
607 }
608
609 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
610 {
611   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
612        0 reduceScatter 275427 275427 275427 204020 11346849 0
613      where:
614        1) The first four values after the name of the action declare the recvcounts array
615        2) The value 11346849 is the amount of instructions
616        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
617  */
618   double clock = smpi_process()->simulated_elapsed();
619   unsigned long comm_size = MPI_COMM_WORLD->size();
620   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
621   int comp_size = parse_double(action[2+comm_size]);
622   int my_proc_id                     = Actor::self()->getPid();
623   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
624   MPI_Datatype MPI_CURRENT_TYPE =
625       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
626
627   for (unsigned int i = 0; i < comm_size; i++) {
628     recvcounts->push_back(std::stoi(action[i + 2]));
629   }
630   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
631
632   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
633                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
634                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
635                                                        Datatype::encode(MPI_CURRENT_TYPE)));
636
637   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
638   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
639
640   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
641   smpi_execute_flops(comp_size);
642
643   TRACE_smpi_comm_out(my_proc_id);
644   log_timed_action (action, clock);
645 }
646
647 static void action_allgather(simgrid::xbt::ReplayAction& action)
648 {
649   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
650         0 allGather 275427 275427
651     where:
652         1) 275427 is the sendcount
653         2) 275427 is the recvcount
654         3) No more values mean that the datatype for sent and receive buffer is the default one, see
655     simgrid::smpi::Datatype::decode().
656   */
657   double clock = smpi_process()->simulated_elapsed();
658
659   CHECK_ACTION_PARAMS(action, 2, 2)
660   int sendcount = std::stoi(action[2]);
661   int recvcount = std::stoi(action[3]);
662
663   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
664   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
665
666   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
667   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
668
669   int my_proc_id = Actor::self()->getPid();
670
671   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
672                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
673                                                     Datatype::encode(MPI_CURRENT_TYPE),
674                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
675
676   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
677
678   TRACE_smpi_comm_out(my_proc_id);
679   log_timed_action (action, clock);
680 }
681
682 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
683 {
684   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
685         0 allGatherV 275427 275427 275427 275427 204020
686      where:
687         1) 275427 is the sendcount
688         2) The next four elements declare the recvcounts array
689         3) No more values mean that the datatype for sent and receive buffer is the default one, see
690      simgrid::smpi::Datatype::decode().
691   */
692   double clock = smpi_process()->simulated_elapsed();
693
694   unsigned long comm_size = MPI_COMM_WORLD->size();
695   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
696   int sendcount = std::stoi(action[2]);
697   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
698   std::vector<int> disps(comm_size, 0);
699
700   int datatype_index = 0, disp_index = 0;
701   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
702     datatype_index = 3 + comm_size;
703     disp_index     = datatype_index + 1;
704   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
705     datatype_index = -1;
706     disp_index     = 3 + comm_size;
707   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
708     datatype_index = 3 + comm_size;
709   }
710
711   if (disp_index != 0) {
712     for (unsigned int i = 0; i < comm_size; i++)
713       disps[i]          = std::stoi(action[disp_index + i]);
714   }
715
716   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
717                                                      : MPI_DEFAULT_TYPE};
718   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
719                                                       : MPI_DEFAULT_TYPE};
720
721   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
722
723   for (unsigned int i = 0; i < comm_size; i++) {
724     (*recvcounts)[i] = std::stoi(action[i + 3]);
725   }
726   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
727   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
728
729   int my_proc_id = Actor::self()->getPid();
730
731   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
732                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
733                                                        Datatype::encode(MPI_CURRENT_TYPE),
734                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
735
736   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
737                     MPI_COMM_WORLD);
738
739   TRACE_smpi_comm_out(my_proc_id);
740   log_timed_action (action, clock);
741 }
742
743 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
744 {
745   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
746         0 allToAllV 100 1 7 10 12 100 1 70 10 5
747      where:
748         1) 100 is the size of the send buffer *sizeof(int),
749         2) 1 7 10 12 is the sendcounts array
750         3) 100*sizeof(int) is the size of the receiver buffer
751         4)  1 70 10 5 is the recvcounts array
752   */
753   double clock = smpi_process()->simulated_elapsed();
754
755   unsigned long comm_size = MPI_COMM_WORLD->size();
756   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
757   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
758   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
759   std::vector<int> senddisps(comm_size, 0);
760   std::vector<int> recvdisps(comm_size, 0);
761
762   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
763                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
764                                       : MPI_DEFAULT_TYPE;
765   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
766                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
767                                      : MPI_DEFAULT_TYPE};
768
769   int send_buf_size=parse_double(action[2]);
770   int recv_buf_size=parse_double(action[3+comm_size]);
771   int my_proc_id = Actor::self()->getPid();
772   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
773   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
774
775   for (unsigned int i = 0; i < comm_size; i++) {
776     (*sendcounts)[i] = std::stoi(action[3 + i]);
777     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
778   }
779   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
780   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
781
782   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
783                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
784                                                        Datatype::encode(MPI_CURRENT_TYPE),
785                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
786
787   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
788                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
789
790   TRACE_smpi_comm_out(my_proc_id);
791   log_timed_action (action, clock);
792 }
793
794 }} // namespace simgrid::smpi
795
796 /** @brief Only initialize the replay, don't do it for real */
797 void smpi_replay_init(int* argc, char*** argv)
798 {
799   simgrid::smpi::Process::init(argc, argv);
800   smpi_process()->mark_as_initialized();
801   smpi_process()->set_replaying(true);
802
803   int my_proc_id = Actor::self()->getPid();
804   TRACE_smpi_init(my_proc_id);
805   TRACE_smpi_computing_init(my_proc_id);
806   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
807   TRACE_smpi_comm_out(my_proc_id);
808   xbt_replay_action_register("init",       simgrid::smpi::action_init);
809   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
810   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
811   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
812   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
813
814   std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
815   std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
816   std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
817   std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
818   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
819
820   xbt_replay_action_register("send",
821                              std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
822   xbt_replay_action_register("Isend",
823                              std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
824   xbt_replay_action_register("recv",
825                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
826   xbt_replay_action_register("Irecv",
827                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
828   xbt_replay_action_register("test", simgrid::smpi::action_test);
829   xbt_replay_action_register("wait",
830                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
831   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
832   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
833   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
834   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
835   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
836   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
837   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
838   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
839   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
840   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
841   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
842   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
843   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
844   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
845   xbt_replay_action_register("compute", simgrid::smpi::action_compute);
846
847   //if we have a delayed start, sleep here.
848   if(*argc>2){
849     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
850     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
851     smpi_execute_flops(value);
852   } else {
853     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
854     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
855     smpi_execute_flops(0.0);
856   }
857 }
858
859 /** @brief actually run the replay after initialization */
860 void smpi_replay_main(int* argc, char*** argv)
861 {
862   simgrid::xbt::replay_runner(*argc, *argv);
863
864   /* and now, finalize everything */
865   /* One active process will stop. Decrease the counter*/
866   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
867   if (not get_reqq_self()->empty()) {
868     unsigned int count_requests=get_reqq_self()->size();
869     MPI_Request requests[count_requests];
870     MPI_Status status[count_requests];
871     unsigned int i=0;
872
873     for (auto const& req : *get_reqq_self()) {
874       requests[i] = req;
875       i++;
876     }
877     simgrid::smpi::Request::waitall(count_requests, requests, status);
878   }
879   delete get_reqq_self();
880   active_processes--;
881
882   if(active_processes==0){
883     /* Last process alive speaking: end the simulated timer */
884     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
885     smpi_free_replay_tmp_buffers();
886   }
887
888   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
889
890   smpi_process()->finalize();
891
892   TRACE_smpi_comm_out(Actor::self()->getPid());
893   TRACE_smpi_finalize(Actor::self()->getPid());
894 }
895
896 /** @brief chain a replay initialization and a replay start */
897 void smpi_replay_run(int* argc, char*** argv)
898 {
899   smpi_replay_init(argc, argv);
900   smpi_replay_main(argc, argv);
901 }