Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
73b66cf273de7fc44acb27f66efa5c75fd0afa46
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     if (name != "Init")
120       log_timed_action(action, start_time);
121   }
122
123   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
124 };
125
126 class WaitAction : public ReplayAction<ActionArgParser> {
127 public:
128   WaitAction() : ReplayAction("Wait") {}
129   void kernel(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 0, 0)
132     MPI_Status status;
133
134     std::string s = boost::algorithm::join(action, " ");
135     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136     MPI_Request request = get_reqq_self()->back();
137     get_reqq_self()->pop_back();
138
139     if (request == nullptr) {
140       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
141        * return.*/
142       return;
143     }
144
145     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146
147     // Must be taken before Request::wait() since the request may be set to
148     // MPI_REQUEST_NULL by Request::wait!
149     int src                  = request->comm()->group()->rank(request->src());
150     int dst                  = request->comm()->group()->rank(request->dst());
151     bool is_wait_for_receive = (request->flags() & RECV);
152     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
154
155     Request::wait(&request, &status);
156
157     TRACE_smpi_comm_out(rank);
158     if (is_wait_for_receive)
159       TRACE_smpi_recv(src, dst, 0);
160   }
161 };
162
163 class SendAction : public ReplayAction<SendRecvParser> {
164 public:
165   SendAction() = delete;
166   SendAction(std::string name) : ReplayAction(name) {}
167   void kernel(simgrid::xbt::ReplayAction& action) override
168   {
169     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
170
171     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172                                                                                  Datatype::encode(args.datatype1)));
173     if (not TRACE_smpi_view_internals())
174       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
175
176     if (name == "send") {
177       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178     } else if (name == "Isend") {
179       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180       get_reqq_self()->push_back(request);
181     } else {
182       xbt_die("Don't know this action, %s", name.c_str());
183     }
184
185     TRACE_smpi_comm_out(my_proc_id);
186   }
187 };
188
189 class RecvAction : public ReplayAction<SendRecvParser> {
190 public:
191   RecvAction() = delete;
192   explicit RecvAction(std::string name) : ReplayAction(name) {}
193   void kernel(simgrid::xbt::ReplayAction& action) override
194   {
195     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
196
197     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198                                                                                  Datatype::encode(args.datatype1)));
199
200     MPI_Status status;
201     // unknown size from the receiver point of view
202     if (args.size <= 0.0) {
203       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204       args.size = status.count;
205     }
206
207     if (name == "recv") {
208       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209     } else if (name == "Irecv") {
210       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211       get_reqq_self()->push_back(request);
212     }
213
214     TRACE_smpi_comm_out(my_proc_id);
215     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216     if (name == "recv" && not TRACE_smpi_view_internals()) {
217       TRACE_smpi_recv(src_traced, my_proc_id, 0);
218     }
219   }
220 };
221
222 class ComputeAction : public ReplayAction<ComputeParser> {
223 public:
224   ComputeAction() : ReplayAction("compute") {}
225   void kernel(simgrid::xbt::ReplayAction& action) override
226   {
227     TRACE_smpi_computing_in(my_proc_id, args.flops);
228     smpi_execute_flops(args.flops);
229     TRACE_smpi_computing_out(my_proc_id);
230   }
231 };
232
233 class TestAction : public ReplayAction<ActionArgParser> {
234 public:
235   TestAction() : ReplayAction("Test") {}
236   void kernel(simgrid::xbt::ReplayAction& action) override
237   {
238     MPI_Request request = get_reqq_self()->back();
239     get_reqq_self()->pop_back();
240     // if request is null here, this may mean that a previous test has succeeded
241     // Different times in traced application and replayed version may lead to this
242     // In this case, ignore the extra calls.
243     if (request != nullptr) {
244       TRACE_smpi_testing_in(my_proc_id);
245
246       MPI_Status status;
247       int flag = Request::test(&request, &status);
248
249       XBT_DEBUG("MPI_Test result: %d", flag);
250       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
251        * nullptr.*/
252       get_reqq_self()->push_back(request);
253
254       TRACE_smpi_testing_out(my_proc_id);
255     }
256   }
257 };
258
259 class InitAction : public ReplayAction<ActionArgParser> {
260 public:
261   InitAction() : ReplayAction("Init") {}
262   void kernel(simgrid::xbt::ReplayAction& action) override
263   {
264     CHECK_ACTION_PARAMS(action, 0, 1)
265     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266                                            : MPI_BYTE;  // default TAU datatype
267
268     /* start a simulated timer */
269     smpi_process()->simulated_start();
270     /*initialize the number of active processes */
271     active_processes = smpi_process_count();
272
273     set_reqq_self(new std::vector<MPI_Request>);
274   }
275 };
276
277 } // Replay Namespace
278
279 static void action_finalize(simgrid::xbt::ReplayAction& action)
280 {
281   /* Nothing to do */
282 }
283
284 static void action_comm_size(simgrid::xbt::ReplayAction& action)
285 {
286   log_timed_action (action, smpi_process()->simulated_elapsed());
287 }
288
289 static void action_comm_split(simgrid::xbt::ReplayAction& action)
290 {
291   log_timed_action (action, smpi_process()->simulated_elapsed());
292 }
293
294 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
295 {
296   log_timed_action (action, smpi_process()->simulated_elapsed());
297 }
298
299 static void action_waitall(simgrid::xbt::ReplayAction& action)
300 {
301   CHECK_ACTION_PARAMS(action, 0, 0)
302   double clock = smpi_process()->simulated_elapsed();
303   const unsigned int count_requests = get_reqq_self()->size();
304
305   if (count_requests>0) {
306     MPI_Status status[count_requests];
307
308     int my_proc_id_traced = Actor::self()->getPid();
309     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
310                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
311     int recvs_snd[count_requests];
312     int recvs_rcv[count_requests];
313     for (unsigned int i = 0; i < count_requests; i++) {
314       const auto& req = (*get_reqq_self())[i];
315       if (req && (req->flags() & RECV)) {
316         recvs_snd[i] = req->src();
317         recvs_rcv[i] = req->dst();
318       } else
319         recvs_snd[i] = -100;
320    }
321    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
322
323    for (unsigned i = 0; i < count_requests; i++) {
324      if (recvs_snd[i]!=-100)
325        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
326    }
327    TRACE_smpi_comm_out(my_proc_id_traced);
328   }
329   log_timed_action (action, clock);
330 }
331
332 static void action_barrier(simgrid::xbt::ReplayAction& action)
333 {
334   double clock = smpi_process()->simulated_elapsed();
335   int my_proc_id = Actor::self()->getPid();
336   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
337
338   Colls::barrier(MPI_COMM_WORLD);
339
340   TRACE_smpi_comm_out(my_proc_id);
341   log_timed_action (action, clock);
342 }
343
344 static void action_bcast(simgrid::xbt::ReplayAction& action)
345 {
346   CHECK_ACTION_PARAMS(action, 1, 2)
347   double size = parse_double(action[2]);
348   double clock = smpi_process()->simulated_elapsed();
349   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
350   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
351   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
352
353   int my_proc_id = Actor::self()->getPid();
354   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
355                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
356                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
357
358   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
359
360   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
361
362   TRACE_smpi_comm_out(my_proc_id);
363   log_timed_action (action, clock);
364 }
365
366 static void action_reduce(simgrid::xbt::ReplayAction& action)
367 {
368   CHECK_ACTION_PARAMS(action, 2, 2)
369   double comm_size = parse_double(action[2]);
370   double comp_size = parse_double(action[3]);
371   double clock = smpi_process()->simulated_elapsed();
372   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
373
374   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
375
376   int my_proc_id = Actor::self()->getPid();
377   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
378                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
379                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
380
381   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
382   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
383   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
384   smpi_execute_flops(comp_size);
385
386   TRACE_smpi_comm_out(my_proc_id);
387   log_timed_action (action, clock);
388 }
389
390 static void action_allReduce(simgrid::xbt::ReplayAction& action)
391 {
392   CHECK_ACTION_PARAMS(action, 2, 1)
393   double comm_size = parse_double(action[2]);
394   double comp_size = parse_double(action[3]);
395
396   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
397
398   double clock = smpi_process()->simulated_elapsed();
399   int my_proc_id = Actor::self()->getPid();
400   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
401                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
402
403   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
404   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
405   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
406   smpi_execute_flops(comp_size);
407
408   TRACE_smpi_comm_out(my_proc_id);
409   log_timed_action (action, clock);
410 }
411
412 static void action_allToAll(simgrid::xbt::ReplayAction& action)
413 {
414   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
415   double clock = smpi_process()->simulated_elapsed();
416   unsigned long comm_size = MPI_COMM_WORLD->size();
417   int send_size = parse_double(action[2]);
418   int recv_size = parse_double(action[3]);
419   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
420   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
421
422   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
423   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
424
425   int my_proc_id = Actor::self()->getPid();
426   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
427                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
428                                                     Datatype::encode(MPI_CURRENT_TYPE),
429                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
430
431   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
432
433   TRACE_smpi_comm_out(my_proc_id);
434   log_timed_action (action, clock);
435 }
436
437 static void action_gather(simgrid::xbt::ReplayAction& action)
438 {
439   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
440         0 gather 68 68 0 0 0
441       where:
442         1) 68 is the sendcounts
443         2) 68 is the recvcounts
444         3) 0 is the root node
445         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
446         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
447   */
448   CHECK_ACTION_PARAMS(action, 2, 3)
449   double clock = smpi_process()->simulated_elapsed();
450   unsigned long comm_size = MPI_COMM_WORLD->size();
451   int send_size = parse_double(action[2]);
452   int recv_size = parse_double(action[3]);
453   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
454   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
455
456   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
457   void *recv = nullptr;
458   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
459   int rank = MPI_COMM_WORLD->rank();
460
461   if(rank==root)
462     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
463
464   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
465                                                                         Datatype::encode(MPI_CURRENT_TYPE),
466                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
467
468   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
469
470   TRACE_smpi_comm_out(Actor::self()->getPid());
471   log_timed_action (action, clock);
472 }
473
474 static void action_scatter(simgrid::xbt::ReplayAction& action)
475 {
476   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
477         0 gather 68 68 0 0 0
478       where:
479         1) 68 is the sendcounts
480         2) 68 is the recvcounts
481         3) 0 is the root node
482         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
483         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
484   */
485   CHECK_ACTION_PARAMS(action, 2, 3)
486   double clock                   = smpi_process()->simulated_elapsed();
487   unsigned long comm_size        = MPI_COMM_WORLD->size();
488   int send_size                  = parse_double(action[2]);
489   int recv_size                  = parse_double(action[3]);
490   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
491   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
492
493   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
494   void* recv = nullptr;
495   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
496   int rank = MPI_COMM_WORLD->rank();
497
498   if (rank == root)
499     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
500
501   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
502                                                                         Datatype::encode(MPI_CURRENT_TYPE),
503                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
504
505   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
506
507   TRACE_smpi_comm_out(Actor::self()->getPid());
508   log_timed_action(action, clock);
509 }
510
511 static void action_gatherv(simgrid::xbt::ReplayAction& action)
512 {
513   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
514        0 gather 68 68 10 10 10 0 0 0
515      where:
516        1) 68 is the sendcount
517        2) 68 10 10 10 is the recvcounts
518        3) 0 is the root node
519        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
520        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
521   */
522   double clock = smpi_process()->simulated_elapsed();
523   unsigned long comm_size = MPI_COMM_WORLD->size();
524   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
525   int send_size = parse_double(action[2]);
526   std::vector<int> disps(comm_size, 0);
527   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
528
529   MPI_Datatype MPI_CURRENT_TYPE =
530       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
531   MPI_Datatype MPI_CURRENT_TYPE2{
532       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
533
534   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
535   void *recv = nullptr;
536   for (unsigned int i = 0; i < comm_size; i++) {
537     (*recvcounts)[i] = std::stoi(action[i + 3]);
538   }
539   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
540
541   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
542   int rank = MPI_COMM_WORLD->rank();
543
544   if(rank==root)
545     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
546
547   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
548                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
549                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
550
551   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
552                  MPI_COMM_WORLD);
553
554   TRACE_smpi_comm_out(Actor::self()->getPid());
555   log_timed_action (action, clock);
556 }
557
558 static void action_scatterv(simgrid::xbt::ReplayAction& action)
559 {
560   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
561        0 gather 68 10 10 10 68 0 0 0
562      where:
563        1) 68 10 10 10 is the sendcounts
564        2) 68 is the recvcount
565        3) 0 is the root node
566        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
567        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
568   */
569   double clock  = smpi_process()->simulated_elapsed();
570   unsigned long comm_size = MPI_COMM_WORLD->size();
571   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
572   int recv_size = parse_double(action[2 + comm_size]);
573   std::vector<int> disps(comm_size, 0);
574   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
575
576   MPI_Datatype MPI_CURRENT_TYPE =
577       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
578   MPI_Datatype MPI_CURRENT_TYPE2{
579       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
580
581   void* send = nullptr;
582   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
583   for (unsigned int i = 0; i < comm_size; i++) {
584     (*sendcounts)[i] = std::stoi(action[i + 2]);
585   }
586   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
587
588   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
589   int rank = MPI_COMM_WORLD->rank();
590
591   if (rank == root)
592     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
593
594   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
595                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
596                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
597
598   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
599                   MPI_COMM_WORLD);
600
601   TRACE_smpi_comm_out(Actor::self()->getPid());
602   log_timed_action(action, clock);
603 }
604
605 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
606 {
607   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
608        0 reduceScatter 275427 275427 275427 204020 11346849 0
609      where:
610        1) The first four values after the name of the action declare the recvcounts array
611        2) The value 11346849 is the amount of instructions
612        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
613  */
614   double clock = smpi_process()->simulated_elapsed();
615   unsigned long comm_size = MPI_COMM_WORLD->size();
616   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
617   int comp_size = parse_double(action[2+comm_size]);
618   int my_proc_id                     = Actor::self()->getPid();
619   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
620   MPI_Datatype MPI_CURRENT_TYPE =
621       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
622
623   for (unsigned int i = 0; i < comm_size; i++) {
624     recvcounts->push_back(std::stoi(action[i + 2]));
625   }
626   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
627
628   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
629                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
630                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
631                                                        Datatype::encode(MPI_CURRENT_TYPE)));
632
633   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
634   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
635
636   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
637   smpi_execute_flops(comp_size);
638
639   TRACE_smpi_comm_out(my_proc_id);
640   log_timed_action (action, clock);
641 }
642
643 static void action_allgather(simgrid::xbt::ReplayAction& action)
644 {
645   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
646         0 allGather 275427 275427
647     where:
648         1) 275427 is the sendcount
649         2) 275427 is the recvcount
650         3) No more values mean that the datatype for sent and receive buffer is the default one, see
651     simgrid::smpi::Datatype::decode().
652   */
653   double clock = smpi_process()->simulated_elapsed();
654
655   CHECK_ACTION_PARAMS(action, 2, 2)
656   int sendcount = std::stoi(action[2]);
657   int recvcount = std::stoi(action[3]);
658
659   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
660   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
661
662   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
663   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
664
665   int my_proc_id = Actor::self()->getPid();
666
667   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
668                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
669                                                     Datatype::encode(MPI_CURRENT_TYPE),
670                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
671
672   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
673
674   TRACE_smpi_comm_out(my_proc_id);
675   log_timed_action (action, clock);
676 }
677
678 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
679 {
680   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
681         0 allGatherV 275427 275427 275427 275427 204020
682      where:
683         1) 275427 is the sendcount
684         2) The next four elements declare the recvcounts array
685         3) No more values mean that the datatype for sent and receive buffer is the default one, see
686      simgrid::smpi::Datatype::decode().
687   */
688   double clock = smpi_process()->simulated_elapsed();
689
690   unsigned long comm_size = MPI_COMM_WORLD->size();
691   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
692   int sendcount = std::stoi(action[2]);
693   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
694   std::vector<int> disps(comm_size, 0);
695
696   int datatype_index = 0, disp_index = 0;
697   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
698     datatype_index = 3 + comm_size;
699     disp_index     = datatype_index + 1;
700   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
701     datatype_index = -1;
702     disp_index     = 3 + comm_size;
703   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
704     datatype_index = 3 + comm_size;
705   }
706
707   if (disp_index != 0) {
708     for (unsigned int i = 0; i < comm_size; i++)
709       disps[i]          = std::stoi(action[disp_index + i]);
710   }
711
712   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
713                                                      : MPI_DEFAULT_TYPE};
714   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
715                                                       : MPI_DEFAULT_TYPE};
716
717   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
718
719   for (unsigned int i = 0; i < comm_size; i++) {
720     (*recvcounts)[i] = std::stoi(action[i + 3]);
721   }
722   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
723   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
724
725   int my_proc_id = Actor::self()->getPid();
726
727   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
728                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
729                                                        Datatype::encode(MPI_CURRENT_TYPE),
730                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
731
732   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
733                     MPI_COMM_WORLD);
734
735   TRACE_smpi_comm_out(my_proc_id);
736   log_timed_action (action, clock);
737 }
738
739 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
740 {
741   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
742         0 allToAllV 100 1 7 10 12 100 1 70 10 5
743      where:
744         1) 100 is the size of the send buffer *sizeof(int),
745         2) 1 7 10 12 is the sendcounts array
746         3) 100*sizeof(int) is the size of the receiver buffer
747         4)  1 70 10 5 is the recvcounts array
748   */
749   double clock = smpi_process()->simulated_elapsed();
750
751   unsigned long comm_size = MPI_COMM_WORLD->size();
752   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
753   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
754   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
755   std::vector<int> senddisps(comm_size, 0);
756   std::vector<int> recvdisps(comm_size, 0);
757
758   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
759                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
760                                       : MPI_DEFAULT_TYPE;
761   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
762                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
763                                      : MPI_DEFAULT_TYPE};
764
765   int send_buf_size=parse_double(action[2]);
766   int recv_buf_size=parse_double(action[3+comm_size]);
767   int my_proc_id = Actor::self()->getPid();
768   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
769   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
770
771   for (unsigned int i = 0; i < comm_size; i++) {
772     (*sendcounts)[i] = std::stoi(action[3 + i]);
773     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
774   }
775   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
776   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
777
778   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
780                                                        Datatype::encode(MPI_CURRENT_TYPE),
781                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
782
783   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
784                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
785
786   TRACE_smpi_comm_out(my_proc_id);
787   log_timed_action (action, clock);
788 }
789
790 }} // namespace simgrid::smpi
791
792 /** @brief Only initialize the replay, don't do it for real */
793 void smpi_replay_init(int* argc, char*** argv)
794 {
795   simgrid::smpi::Process::init(argc, argv);
796   smpi_process()->mark_as_initialized();
797   smpi_process()->set_replaying(true);
798
799   int my_proc_id = Actor::self()->getPid();
800   TRACE_smpi_init(my_proc_id);
801   TRACE_smpi_computing_init(my_proc_id);
802   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
803   TRACE_smpi_comm_out(my_proc_id);
804   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
805   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
806   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
807   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
808   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
809
810   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
811   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
812   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
813   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
814   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
815   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
816   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
817   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
818   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
819   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
820   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
821   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
822   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
823   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
824   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
825   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
826   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
827   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
828   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
829   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
830   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
831
832   //if we have a delayed start, sleep here.
833   if(*argc>2){
834     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
835     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
836     smpi_execute_flops(value);
837   } else {
838     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
839     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
840     smpi_execute_flops(0.0);
841   }
842 }
843
844 /** @brief actually run the replay after initialization */
845 void smpi_replay_main(int* argc, char*** argv)
846 {
847   simgrid::xbt::replay_runner(*argc, *argv);
848
849   /* and now, finalize everything */
850   /* One active process will stop. Decrease the counter*/
851   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
852   if (not get_reqq_self()->empty()) {
853     unsigned int count_requests=get_reqq_self()->size();
854     MPI_Request requests[count_requests];
855     MPI_Status status[count_requests];
856     unsigned int i=0;
857
858     for (auto const& req : *get_reqq_self()) {
859       requests[i] = req;
860       i++;
861     }
862     simgrid::smpi::Request::waitall(count_requests, requests, status);
863   }
864   delete get_reqq_self();
865   active_processes--;
866
867   if(active_processes==0){
868     /* Last process alive speaking: end the simulated timer */
869     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
870     smpi_free_replay_tmp_buffers();
871   }
872
873   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
874
875   smpi_process()->finalize();
876
877   TRACE_smpi_comm_out(Actor::self()->getPid());
878   TRACE_smpi_finalize(Actor::self()->getPid());
879 }
880
881 /** @brief chain a replay initialization and a replay start */
882 void smpi_replay_run(int* argc, char*** argv)
883 {
884   smpi_replay_init(argc, argv);
885   smpi_replay_main(argc, argv);
886 }