Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d89a3fab25356b0be589436e223c6e726808e0c2
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     if (name != "Init")
120       log_timed_action(action, start_time);
121   }
122
123   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
124 };
125
126 class WaitAction : public ReplayAction<ActionArgParser> {
127 public:
128   WaitAction() : ReplayAction("Wait") {}
129   void kernel(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 0, 0)
132     MPI_Status status;
133
134     std::string s = boost::algorithm::join(action, " ");
135     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136     MPI_Request request = get_reqq_self()->back();
137     get_reqq_self()->pop_back();
138
139     if (request == nullptr) {
140       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
141        * return.*/
142       return;
143     }
144
145     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146
147     // Must be taken before Request::wait() since the request may be set to
148     // MPI_REQUEST_NULL by Request::wait!
149     int src                  = request->comm()->group()->rank(request->src());
150     int dst                  = request->comm()->group()->rank(request->dst());
151     bool is_wait_for_receive = (request->flags() & RECV);
152     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
154
155     Request::wait(&request, &status);
156
157     TRACE_smpi_comm_out(rank);
158     if (is_wait_for_receive)
159       TRACE_smpi_recv(src, dst, 0);
160   }
161 };
162
163 class SendAction : public ReplayAction<SendRecvParser> {
164 public:
165   SendAction() = delete;
166   SendAction(std::string name) : ReplayAction(name) {}
167   void kernel(simgrid::xbt::ReplayAction& action) override
168   {
169     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
170
171     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172                                                                                  Datatype::encode(args.datatype1)));
173     if (not TRACE_smpi_view_internals())
174       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
175
176     if (name == "send") {
177       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178     } else if (name == "Isend") {
179       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180       get_reqq_self()->push_back(request);
181     } else {
182       xbt_die("Don't know this action, %s", name.c_str());
183     }
184
185     TRACE_smpi_comm_out(my_proc_id);
186   }
187 };
188
189 class RecvAction : public ReplayAction<SendRecvParser> {
190 public:
191   RecvAction() = delete;
192   explicit RecvAction(std::string name) : ReplayAction(name) {}
193   void kernel(simgrid::xbt::ReplayAction& action) override
194   {
195     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
196
197     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198                                                                                  Datatype::encode(args.datatype1)));
199
200     MPI_Status status;
201     // unknown size from the receiver point of view
202     if (args.size <= 0.0) {
203       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204       args.size = status.count;
205     }
206
207     if (name == "recv") {
208       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209     } else if (name == "Irecv") {
210       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211       get_reqq_self()->push_back(request);
212     }
213
214     TRACE_smpi_comm_out(my_proc_id);
215     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216     if (name == "recv" && not TRACE_smpi_view_internals()) {
217       TRACE_smpi_recv(src_traced, my_proc_id, 0);
218     }
219   }
220 };
221
222 class ComputeAction : public ReplayAction<ComputeParser> {
223 public:
224   ComputeAction() : ReplayAction("compute") {}
225   void kernel(simgrid::xbt::ReplayAction& action) override
226   {
227     TRACE_smpi_computing_in(my_proc_id, args.flops);
228     smpi_execute_flops(args.flops);
229     TRACE_smpi_computing_out(my_proc_id);
230   }
231 };
232
233 class TestAction : public ReplayAction<ActionArgParser> {
234 public:
235   TestAction() : ReplayAction("Test") {}
236   void kernel(simgrid::xbt::ReplayAction& action) override
237   {
238     MPI_Request request = get_reqq_self()->back();
239     get_reqq_self()->pop_back();
240     // if request is null here, this may mean that a previous test has succeeded
241     // Different times in traced application and replayed version may lead to this
242     // In this case, ignore the extra calls.
243     if (request != nullptr) {
244       TRACE_smpi_testing_in(my_proc_id);
245
246       MPI_Status status;
247       int flag = Request::test(&request, &status);
248
249       XBT_DEBUG("MPI_Test result: %d", flag);
250       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
251        * nullptr.*/
252       get_reqq_self()->push_back(request);
253
254       TRACE_smpi_testing_out(my_proc_id);
255     }
256   }
257 };
258
259 class InitAction : public ReplayAction<ActionArgParser> {
260 public:
261   InitAction() : ReplayAction("Init") {}
262   void kernel(simgrid::xbt::ReplayAction& action) override
263   {
264     CHECK_ACTION_PARAMS(action, 0, 1)
265     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266                                            : MPI_BYTE;  // default TAU datatype
267
268     /* start a simulated timer */
269     smpi_process()->simulated_start();
270     /*initialize the number of active processes */
271     active_processes = smpi_process_count();
272
273     set_reqq_self(new std::vector<MPI_Request>);
274   }
275 };
276
277 class CommunicatorAction : public ReplayAction<ActionArgParser> {
278 public:
279   CommunicatorAction() : ReplayAction("Comm") {}
280   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
281 };
282
283 class WaitAllAction : public ReplayAction<ActionArgParser> {
284 public:
285   WaitAllAction() : ReplayAction("waitAll") {}
286   void kernel(simgrid::xbt::ReplayAction& action) override
287   {
288     const unsigned int count_requests = get_reqq_self()->size();
289
290     if (count_requests > 0) {
291       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
292                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
293       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
294       for (const auto& req : (*get_reqq_self())) {
295         if (req && (req->flags() & RECV)) {
296           sender_receiver.push_back({req->src(), req->dst()});
297         }
298       }
299       MPI_Status status[count_requests];
300       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
301
302       for (auto& pair : sender_receiver) {
303         TRACE_smpi_recv(pair.first, pair.second, 0);
304       }
305       TRACE_smpi_comm_out(my_proc_id);
306     }
307   }
308 };
309
310 class BarrierAction : public ReplayAction<ActionArgParser> {
311 public:
312   BarrierAction() : ReplayAction("barrier") {}
313   void kernel(simgrid::xbt::ReplayAction& action) override
314   {
315     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
316     Colls::barrier(MPI_COMM_WORLD);
317     TRACE_smpi_comm_out(my_proc_id);
318   }
319 };
320
321 } // Replay Namespace
322
323 static void action_bcast(simgrid::xbt::ReplayAction& action)
324 {
325   CHECK_ACTION_PARAMS(action, 1, 2)
326   double size = parse_double(action[2]);
327   double clock = smpi_process()->simulated_elapsed();
328   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
329   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
330   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
331
332   int my_proc_id = Actor::self()->getPid();
333   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
334                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
335                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
336
337   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
338
339   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
340
341   TRACE_smpi_comm_out(my_proc_id);
342   log_timed_action (action, clock);
343 }
344
345 static void action_reduce(simgrid::xbt::ReplayAction& action)
346 {
347   CHECK_ACTION_PARAMS(action, 2, 2)
348   double comm_size = parse_double(action[2]);
349   double comp_size = parse_double(action[3]);
350   double clock = smpi_process()->simulated_elapsed();
351   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
352
353   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
354
355   int my_proc_id = Actor::self()->getPid();
356   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
357                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
358                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
359
360   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
361   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
362   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
363   smpi_execute_flops(comp_size);
364
365   TRACE_smpi_comm_out(my_proc_id);
366   log_timed_action (action, clock);
367 }
368
369 static void action_allReduce(simgrid::xbt::ReplayAction& action)
370 {
371   CHECK_ACTION_PARAMS(action, 2, 1)
372   double comm_size = parse_double(action[2]);
373   double comp_size = parse_double(action[3]);
374
375   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
376
377   double clock = smpi_process()->simulated_elapsed();
378   int my_proc_id = Actor::self()->getPid();
379   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
380                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
381
382   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
383   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
384   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
385   smpi_execute_flops(comp_size);
386
387   TRACE_smpi_comm_out(my_proc_id);
388   log_timed_action (action, clock);
389 }
390
391 static void action_allToAll(simgrid::xbt::ReplayAction& action)
392 {
393   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
394   double clock = smpi_process()->simulated_elapsed();
395   unsigned long comm_size = MPI_COMM_WORLD->size();
396   int send_size = parse_double(action[2]);
397   int recv_size = parse_double(action[3]);
398   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
399   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
400
401   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
402   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
403
404   int my_proc_id = Actor::self()->getPid();
405   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
406                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
407                                                     Datatype::encode(MPI_CURRENT_TYPE),
408                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
409
410   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
411
412   TRACE_smpi_comm_out(my_proc_id);
413   log_timed_action (action, clock);
414 }
415
416 static void action_gather(simgrid::xbt::ReplayAction& action)
417 {
418   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
419         0 gather 68 68 0 0 0
420       where:
421         1) 68 is the sendcounts
422         2) 68 is the recvcounts
423         3) 0 is the root node
424         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
425         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
426   */
427   CHECK_ACTION_PARAMS(action, 2, 3)
428   double clock = smpi_process()->simulated_elapsed();
429   unsigned long comm_size = MPI_COMM_WORLD->size();
430   int send_size = parse_double(action[2]);
431   int recv_size = parse_double(action[3]);
432   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
433   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
434
435   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
436   void *recv = nullptr;
437   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
438   int rank = MPI_COMM_WORLD->rank();
439
440   if(rank==root)
441     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
442
443   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
444                                                                         Datatype::encode(MPI_CURRENT_TYPE),
445                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
446
447   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
448
449   TRACE_smpi_comm_out(Actor::self()->getPid());
450   log_timed_action (action, clock);
451 }
452
453 static void action_scatter(simgrid::xbt::ReplayAction& action)
454 {
455   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
456         0 gather 68 68 0 0 0
457       where:
458         1) 68 is the sendcounts
459         2) 68 is the recvcounts
460         3) 0 is the root node
461         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
462         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
463   */
464   CHECK_ACTION_PARAMS(action, 2, 3)
465   double clock                   = smpi_process()->simulated_elapsed();
466   unsigned long comm_size        = MPI_COMM_WORLD->size();
467   int send_size                  = parse_double(action[2]);
468   int recv_size                  = parse_double(action[3]);
469   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
470   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
471
472   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
473   void* recv = nullptr;
474   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
475   int rank = MPI_COMM_WORLD->rank();
476
477   if (rank == root)
478     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
479
480   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
481                                                                         Datatype::encode(MPI_CURRENT_TYPE),
482                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
483
484   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
485
486   TRACE_smpi_comm_out(Actor::self()->getPid());
487   log_timed_action(action, clock);
488 }
489
490 static void action_gatherv(simgrid::xbt::ReplayAction& action)
491 {
492   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
493        0 gather 68 68 10 10 10 0 0 0
494      where:
495        1) 68 is the sendcount
496        2) 68 10 10 10 is the recvcounts
497        3) 0 is the root node
498        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
499        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
500   */
501   double clock = smpi_process()->simulated_elapsed();
502   unsigned long comm_size = MPI_COMM_WORLD->size();
503   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
504   int send_size = parse_double(action[2]);
505   std::vector<int> disps(comm_size, 0);
506   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
507
508   MPI_Datatype MPI_CURRENT_TYPE =
509       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
510   MPI_Datatype MPI_CURRENT_TYPE2{
511       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
512
513   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
514   void *recv = nullptr;
515   for (unsigned int i = 0; i < comm_size; i++) {
516     (*recvcounts)[i] = std::stoi(action[i + 3]);
517   }
518   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
519
520   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
521   int rank = MPI_COMM_WORLD->rank();
522
523   if(rank==root)
524     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
525
526   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
527                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
528                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
529
530   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
531                  MPI_COMM_WORLD);
532
533   TRACE_smpi_comm_out(Actor::self()->getPid());
534   log_timed_action (action, clock);
535 }
536
537 static void action_scatterv(simgrid::xbt::ReplayAction& action)
538 {
539   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
540        0 gather 68 10 10 10 68 0 0 0
541      where:
542        1) 68 10 10 10 is the sendcounts
543        2) 68 is the recvcount
544        3) 0 is the root node
545        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
546        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
547   */
548   double clock  = smpi_process()->simulated_elapsed();
549   unsigned long comm_size = MPI_COMM_WORLD->size();
550   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
551   int recv_size = parse_double(action[2 + comm_size]);
552   std::vector<int> disps(comm_size, 0);
553   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
554
555   MPI_Datatype MPI_CURRENT_TYPE =
556       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
557   MPI_Datatype MPI_CURRENT_TYPE2{
558       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
559
560   void* send = nullptr;
561   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
562   for (unsigned int i = 0; i < comm_size; i++) {
563     (*sendcounts)[i] = std::stoi(action[i + 2]);
564   }
565   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
566
567   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
568   int rank = MPI_COMM_WORLD->rank();
569
570   if (rank == root)
571     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
572
573   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
574                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
575                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
576
577   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
578                   MPI_COMM_WORLD);
579
580   TRACE_smpi_comm_out(Actor::self()->getPid());
581   log_timed_action(action, clock);
582 }
583
584 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
585 {
586   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
587        0 reduceScatter 275427 275427 275427 204020 11346849 0
588      where:
589        1) The first four values after the name of the action declare the recvcounts array
590        2) The value 11346849 is the amount of instructions
591        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
592  */
593   double clock = smpi_process()->simulated_elapsed();
594   unsigned long comm_size = MPI_COMM_WORLD->size();
595   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
596   int comp_size = parse_double(action[2+comm_size]);
597   int my_proc_id                     = Actor::self()->getPid();
598   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
599   MPI_Datatype MPI_CURRENT_TYPE =
600       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
601
602   for (unsigned int i = 0; i < comm_size; i++) {
603     recvcounts->push_back(std::stoi(action[i + 2]));
604   }
605   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
606
607   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
608                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
609                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
610                                                        Datatype::encode(MPI_CURRENT_TYPE)));
611
612   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
613   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
614
615   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
616   smpi_execute_flops(comp_size);
617
618   TRACE_smpi_comm_out(my_proc_id);
619   log_timed_action (action, clock);
620 }
621
622 static void action_allgather(simgrid::xbt::ReplayAction& action)
623 {
624   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
625         0 allGather 275427 275427
626     where:
627         1) 275427 is the sendcount
628         2) 275427 is the recvcount
629         3) No more values mean that the datatype for sent and receive buffer is the default one, see
630     simgrid::smpi::Datatype::decode().
631   */
632   double clock = smpi_process()->simulated_elapsed();
633
634   CHECK_ACTION_PARAMS(action, 2, 2)
635   int sendcount = std::stoi(action[2]);
636   int recvcount = std::stoi(action[3]);
637
638   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
639   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
640
641   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
642   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
643
644   int my_proc_id = Actor::self()->getPid();
645
646   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
647                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
648                                                     Datatype::encode(MPI_CURRENT_TYPE),
649                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
650
651   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
652
653   TRACE_smpi_comm_out(my_proc_id);
654   log_timed_action (action, clock);
655 }
656
657 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
658 {
659   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
660         0 allGatherV 275427 275427 275427 275427 204020
661      where:
662         1) 275427 is the sendcount
663         2) The next four elements declare the recvcounts array
664         3) No more values mean that the datatype for sent and receive buffer is the default one, see
665      simgrid::smpi::Datatype::decode().
666   */
667   double clock = smpi_process()->simulated_elapsed();
668
669   unsigned long comm_size = MPI_COMM_WORLD->size();
670   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
671   int sendcount = std::stoi(action[2]);
672   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
673   std::vector<int> disps(comm_size, 0);
674
675   int datatype_index = 0, disp_index = 0;
676   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
677     datatype_index = 3 + comm_size;
678     disp_index     = datatype_index + 1;
679   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
680     datatype_index = -1;
681     disp_index     = 3 + comm_size;
682   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
683     datatype_index = 3 + comm_size;
684   }
685
686   if (disp_index != 0) {
687     for (unsigned int i = 0; i < comm_size; i++)
688       disps[i]          = std::stoi(action[disp_index + i]);
689   }
690
691   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
692                                                      : MPI_DEFAULT_TYPE};
693   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
694                                                       : MPI_DEFAULT_TYPE};
695
696   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
697
698   for (unsigned int i = 0; i < comm_size; i++) {
699     (*recvcounts)[i] = std::stoi(action[i + 3]);
700   }
701   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
702   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
703
704   int my_proc_id = Actor::self()->getPid();
705
706   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
707                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
708                                                        Datatype::encode(MPI_CURRENT_TYPE),
709                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
710
711   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
712                     MPI_COMM_WORLD);
713
714   TRACE_smpi_comm_out(my_proc_id);
715   log_timed_action (action, clock);
716 }
717
718 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
719 {
720   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
721         0 allToAllV 100 1 7 10 12 100 1 70 10 5
722      where:
723         1) 100 is the size of the send buffer *sizeof(int),
724         2) 1 7 10 12 is the sendcounts array
725         3) 100*sizeof(int) is the size of the receiver buffer
726         4)  1 70 10 5 is the recvcounts array
727   */
728   double clock = smpi_process()->simulated_elapsed();
729
730   unsigned long comm_size = MPI_COMM_WORLD->size();
731   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
732   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
733   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
734   std::vector<int> senddisps(comm_size, 0);
735   std::vector<int> recvdisps(comm_size, 0);
736
737   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
738                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
739                                       : MPI_DEFAULT_TYPE;
740   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
741                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
742                                      : MPI_DEFAULT_TYPE};
743
744   int send_buf_size=parse_double(action[2]);
745   int recv_buf_size=parse_double(action[3+comm_size]);
746   int my_proc_id = Actor::self()->getPid();
747   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
748   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
749
750   for (unsigned int i = 0; i < comm_size; i++) {
751     (*sendcounts)[i] = std::stoi(action[3 + i]);
752     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
753   }
754   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
755   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
756
757   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
758                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
759                                                        Datatype::encode(MPI_CURRENT_TYPE),
760                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
761
762   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
763                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
764
765   TRACE_smpi_comm_out(my_proc_id);
766   log_timed_action (action, clock);
767 }
768
769 }} // namespace simgrid::smpi
770
771 /** @brief Only initialize the replay, don't do it for real */
772 void smpi_replay_init(int* argc, char*** argv)
773 {
774   simgrid::smpi::Process::init(argc, argv);
775   smpi_process()->mark_as_initialized();
776   smpi_process()->set_replaying(true);
777
778   int my_proc_id = Actor::self()->getPid();
779   TRACE_smpi_init(my_proc_id);
780   TRACE_smpi_computing_init(my_proc_id);
781   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
782   TRACE_smpi_comm_out(my_proc_id);
783   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
784   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
785   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
786   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
787   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
788
789   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
790   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
791   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
792   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
793   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
794   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
795   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
796   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
797   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
798   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
799   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
800   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
801   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
802   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
803   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
804   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
805   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
806   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
807   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
808   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
809   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
810
811   //if we have a delayed start, sleep here.
812   if(*argc>2){
813     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
814     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
815     smpi_execute_flops(value);
816   } else {
817     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
818     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
819     smpi_execute_flops(0.0);
820   }
821 }
822
823 /** @brief actually run the replay after initialization */
824 void smpi_replay_main(int* argc, char*** argv)
825 {
826   simgrid::xbt::replay_runner(*argc, *argv);
827
828   /* and now, finalize everything */
829   /* One active process will stop. Decrease the counter*/
830   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
831   if (not get_reqq_self()->empty()) {
832     unsigned int count_requests=get_reqq_self()->size();
833     MPI_Request requests[count_requests];
834     MPI_Status status[count_requests];
835     unsigned int i=0;
836
837     for (auto const& req : *get_reqq_self()) {
838       requests[i] = req;
839       i++;
840     }
841     simgrid::smpi::Request::waitall(count_requests, requests, status);
842   }
843   delete get_reqq_self();
844   active_processes--;
845
846   if(active_processes==0){
847     /* Last process alive speaking: end the simulated timer */
848     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
849     smpi_free_replay_tmp_buffers();
850   }
851
852   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
853
854   smpi_process()->finalize();
855
856   TRACE_smpi_comm_out(Actor::self()->getPid());
857   TRACE_smpi_finalize(Actor::self()->getPid());
858 }
859
860 /** @brief chain a replay initialization and a replay start */
861 void smpi_replay_run(int* argc, char*** argv)
862 {
863   smpi_replay_init(argc, argv);
864   smpi_replay_main(argc, argv);
865 }