Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Use std::function for i/send, i/recv actions
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action){};
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 template <class T> class ReplayAction {
92 protected:
93   const std::string name;
94   T args;
95
96   /*
97    * Used to compute the duration of this action.
98    */
99   double start_time;
100
101   int my_proc_id;
102
103 public:
104   explicit ReplayAction(std::string name) : name(name), start_time(0), my_proc_id(simgrid::s4u::Actor::self()->getPid())
105   {
106   }
107
108   virtual void execute(simgrid::xbt::ReplayAction& action)
109   {
110     // Needs to be re-initialized for every action, hence here
111     start_time = smpi_process()->simulated_elapsed();
112     args.parse(action);
113     kernel(action);
114     log_timed_action(action, start_time);
115   }
116
117   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
118 };
119
120 class WaitAction : public ReplayAction<ActionArgParser> {
121 public:
122   WaitAction() : ReplayAction("Wait") {}
123   void kernel(simgrid::xbt::ReplayAction& action) override
124   {
125     CHECK_ACTION_PARAMS(action, 0, 0)
126     MPI_Status status;
127
128     std::string s = boost::algorithm::join(action, " ");
129     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
130     MPI_Request request = get_reqq_self()->back();
131     get_reqq_self()->pop_back();
132
133     if (request == nullptr) {
134       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
135        * return.*/
136       return;
137     }
138
139     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
140
141     MPI_Group group          = request->comm()->group();
142     int src_traced           = group->rank(request->src());
143     int dst_traced           = group->rank(request->dst());
144     bool is_wait_for_receive = (request->flags() & RECV);
145     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
146     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
147
148     Request::wait(&request, &status);
149
150     TRACE_smpi_comm_out(rank);
151     if (is_wait_for_receive)
152       TRACE_smpi_recv(src_traced, dst_traced, 0);
153   }
154 };
155
156 class SendAction : public ReplayAction<SendRecvParser> {
157 public:
158   SendAction() = delete;
159   SendAction(std::string name) : ReplayAction(name) {}
160   void kernel(simgrid::xbt::ReplayAction& action) override
161   {
162     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
163
164     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
165                                                                                  Datatype::encode(args.datatype1)));
166     if (not TRACE_smpi_view_internals())
167       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
168
169     if (name == "send") {
170       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
171     } else if (name == "Isend") {
172       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
173       get_reqq_self()->push_back(request);
174     } else {
175       xbt_die("Don't know this action, %s", name.c_str());
176     }
177
178     TRACE_smpi_comm_out(my_proc_id);
179   }
180 };
181
182 class RecvAction : public ReplayAction<SendRecvParser> {
183 public:
184   RecvAction() = delete;
185   explicit RecvAction(std::string name) : ReplayAction(name) {}
186   void kernel(simgrid::xbt::ReplayAction& action) override
187   {
188     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
189
190     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
191                                                                                  Datatype::encode(args.datatype1)));
192
193     MPI_Status status;
194     // unknown size from the receiver point of view
195     if (args.size <= 0.0) {
196       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
197       args.size = status.count;
198     }
199
200     if (name == "recv") {
201       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
202     } else if (name == "Irecv") {
203       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
204       get_reqq_self()->push_back(request);
205     }
206
207     TRACE_smpi_comm_out(my_proc_id);
208     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
209     if (name == "recv" && not TRACE_smpi_view_internals()) {
210       TRACE_smpi_recv(src_traced, my_proc_id, 0);
211     }
212   }
213 };
214
215 } // Replay Namespace
216
217 static void action_init(simgrid::xbt::ReplayAction& action)
218 {
219   XBT_DEBUG("Initialize the counters");
220   CHECK_ACTION_PARAMS(action, 0, 1)
221   if (action.size() > 2)
222     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
223   else
224     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
225
226   /* start a simulated timer */
227   smpi_process()->simulated_start();
228   /*initialize the number of active processes */
229   active_processes = smpi_process_count();
230
231   set_reqq_self(new std::vector<MPI_Request>);
232 }
233
234 static void action_finalize(simgrid::xbt::ReplayAction& action)
235 {
236   /* Nothing to do */
237 }
238
239 static void action_comm_size(simgrid::xbt::ReplayAction& action)
240 {
241   communicator_size = parse_double(action[2]);
242   log_timed_action (action, smpi_process()->simulated_elapsed());
243 }
244
245 static void action_comm_split(simgrid::xbt::ReplayAction& action)
246 {
247   log_timed_action (action, smpi_process()->simulated_elapsed());
248 }
249
250 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
251 {
252   log_timed_action (action, smpi_process()->simulated_elapsed());
253 }
254
255 static void action_compute(simgrid::xbt::ReplayAction& action)
256 {
257   CHECK_ACTION_PARAMS(action, 1, 0)
258   double clock = smpi_process()->simulated_elapsed();
259   double flops= parse_double(action[2]);
260   int my_proc_id = Actor::self()->getPid();
261
262   TRACE_smpi_computing_in(my_proc_id, flops);
263   smpi_execute_flops(flops);
264   TRACE_smpi_computing_out(my_proc_id);
265
266   log_timed_action (action, clock);
267 }
268
269 static void action_test(simgrid::xbt::ReplayAction& action)
270 {
271   CHECK_ACTION_PARAMS(action, 0, 0)
272   double clock = smpi_process()->simulated_elapsed();
273   MPI_Status status;
274
275   MPI_Request request = get_reqq_self()->back();
276   get_reqq_self()->pop_back();
277   //if request is null here, this may mean that a previous test has succeeded
278   //Different times in traced application and replayed version may lead to this
279   //In this case, ignore the extra calls.
280   if(request!=nullptr){
281     int my_proc_id = Actor::self()->getPid();
282     TRACE_smpi_testing_in(my_proc_id);
283
284     int flag = Request::test(&request, &status);
285
286     XBT_DEBUG("MPI_Test result: %d", flag);
287     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
288     get_reqq_self()->push_back(request);
289
290     TRACE_smpi_testing_out(my_proc_id);
291   }
292   log_timed_action (action, clock);
293 }
294
295 static void action_waitall(simgrid::xbt::ReplayAction& action)
296 {
297   CHECK_ACTION_PARAMS(action, 0, 0)
298   double clock = smpi_process()->simulated_elapsed();
299   const unsigned int count_requests = get_reqq_self()->size();
300
301   if (count_requests>0) {
302     MPI_Status status[count_requests];
303
304     int my_proc_id_traced = Actor::self()->getPid();
305     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
306                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
307     int recvs_snd[count_requests];
308     int recvs_rcv[count_requests];
309     for (unsigned int i = 0; i < count_requests; i++) {
310       const auto& req = (*get_reqq_self())[i];
311       if (req && (req->flags() & RECV)) {
312         recvs_snd[i] = req->src();
313         recvs_rcv[i] = req->dst();
314       } else
315         recvs_snd[i] = -100;
316    }
317    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
318
319    for (unsigned i = 0; i < count_requests; i++) {
320      if (recvs_snd[i]!=-100)
321        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
322    }
323    TRACE_smpi_comm_out(my_proc_id_traced);
324   }
325   log_timed_action (action, clock);
326 }
327
328 static void action_barrier(simgrid::xbt::ReplayAction& action)
329 {
330   double clock = smpi_process()->simulated_elapsed();
331   int my_proc_id = Actor::self()->getPid();
332   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
333
334   Colls::barrier(MPI_COMM_WORLD);
335
336   TRACE_smpi_comm_out(my_proc_id);
337   log_timed_action (action, clock);
338 }
339
340 static void action_bcast(simgrid::xbt::ReplayAction& action)
341 {
342   CHECK_ACTION_PARAMS(action, 1, 2)
343   double size = parse_double(action[2]);
344   double clock = smpi_process()->simulated_elapsed();
345   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
346   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
347   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
348
349   int my_proc_id = Actor::self()->getPid();
350   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
351                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
352                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
353
354   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
355
356   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
357
358   TRACE_smpi_comm_out(my_proc_id);
359   log_timed_action (action, clock);
360 }
361
362 static void action_reduce(simgrid::xbt::ReplayAction& action)
363 {
364   CHECK_ACTION_PARAMS(action, 2, 2)
365   double comm_size = parse_double(action[2]);
366   double comp_size = parse_double(action[3]);
367   double clock = smpi_process()->simulated_elapsed();
368   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
369
370   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
371
372   int my_proc_id = Actor::self()->getPid();
373   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
374                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
375                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
376
377   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
378   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
379   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
380   smpi_execute_flops(comp_size);
381
382   TRACE_smpi_comm_out(my_proc_id);
383   log_timed_action (action, clock);
384 }
385
386 static void action_allReduce(simgrid::xbt::ReplayAction& action)
387 {
388   CHECK_ACTION_PARAMS(action, 2, 1)
389   double comm_size = parse_double(action[2]);
390   double comp_size = parse_double(action[3]);
391
392   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
393
394   double clock = smpi_process()->simulated_elapsed();
395   int my_proc_id = Actor::self()->getPid();
396   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
397                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
398
399   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
400   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
401   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
402   smpi_execute_flops(comp_size);
403
404   TRACE_smpi_comm_out(my_proc_id);
405   log_timed_action (action, clock);
406 }
407
408 static void action_allToAll(simgrid::xbt::ReplayAction& action)
409 {
410   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
411   double clock = smpi_process()->simulated_elapsed();
412   unsigned long comm_size = MPI_COMM_WORLD->size();
413   int send_size = parse_double(action[2]);
414   int recv_size = parse_double(action[3]);
415   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
416   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
417
418   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
419   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
420
421   int my_proc_id = Actor::self()->getPid();
422   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
423                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
424                                                     Datatype::encode(MPI_CURRENT_TYPE),
425                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
426
427   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
428
429   TRACE_smpi_comm_out(my_proc_id);
430   log_timed_action (action, clock);
431 }
432
433 static void action_gather(simgrid::xbt::ReplayAction& action)
434 {
435   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
436         0 gather 68 68 0 0 0
437       where:
438         1) 68 is the sendcounts
439         2) 68 is the recvcounts
440         3) 0 is the root node
441         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
442         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
443   */
444   CHECK_ACTION_PARAMS(action, 2, 3)
445   double clock = smpi_process()->simulated_elapsed();
446   unsigned long comm_size = MPI_COMM_WORLD->size();
447   int send_size = parse_double(action[2]);
448   int recv_size = parse_double(action[3]);
449   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
450   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
451
452   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
453   void *recv = nullptr;
454   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
455   int rank = MPI_COMM_WORLD->rank();
456
457   if(rank==root)
458     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
459
460   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
461                                                                         Datatype::encode(MPI_CURRENT_TYPE),
462                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
463
464   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
465
466   TRACE_smpi_comm_out(Actor::self()->getPid());
467   log_timed_action (action, clock);
468 }
469
470 static void action_scatter(simgrid::xbt::ReplayAction& action)
471 {
472   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
473         0 gather 68 68 0 0 0
474       where:
475         1) 68 is the sendcounts
476         2) 68 is the recvcounts
477         3) 0 is the root node
478         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
479         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
480   */
481   CHECK_ACTION_PARAMS(action, 2, 3)
482   double clock                   = smpi_process()->simulated_elapsed();
483   unsigned long comm_size        = MPI_COMM_WORLD->size();
484   int send_size                  = parse_double(action[2]);
485   int recv_size                  = parse_double(action[3]);
486   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
487   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
488
489   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
490   void* recv = nullptr;
491   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
492   int rank = MPI_COMM_WORLD->rank();
493
494   if (rank == root)
495     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
496
497   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
498                                                                         Datatype::encode(MPI_CURRENT_TYPE),
499                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
500
501   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
502
503   TRACE_smpi_comm_out(Actor::self()->getPid());
504   log_timed_action(action, clock);
505 }
506
507 static void action_gatherv(simgrid::xbt::ReplayAction& action)
508 {
509   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
510        0 gather 68 68 10 10 10 0 0 0
511      where:
512        1) 68 is the sendcount
513        2) 68 10 10 10 is the recvcounts
514        3) 0 is the root node
515        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
516        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
517   */
518   double clock = smpi_process()->simulated_elapsed();
519   unsigned long comm_size = MPI_COMM_WORLD->size();
520   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
521   int send_size = parse_double(action[2]);
522   std::vector<int> disps(comm_size, 0);
523   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
524
525   MPI_Datatype MPI_CURRENT_TYPE =
526       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
527   MPI_Datatype MPI_CURRENT_TYPE2{
528       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
529
530   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531   void *recv = nullptr;
532   for (unsigned int i = 0; i < comm_size; i++) {
533     (*recvcounts)[i] = std::stoi(action[i + 3]);
534   }
535   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
536
537   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
538   int rank = MPI_COMM_WORLD->rank();
539
540   if(rank==root)
541     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
542
543   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
544                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
545                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
546
547   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
548                  MPI_COMM_WORLD);
549
550   TRACE_smpi_comm_out(Actor::self()->getPid());
551   log_timed_action (action, clock);
552 }
553
554 static void action_scatterv(simgrid::xbt::ReplayAction& action)
555 {
556   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
557        0 gather 68 10 10 10 68 0 0 0
558      where:
559        1) 68 10 10 10 is the sendcounts
560        2) 68 is the recvcount
561        3) 0 is the root node
562        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
563        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
564   */
565   double clock  = smpi_process()->simulated_elapsed();
566   unsigned long comm_size = MPI_COMM_WORLD->size();
567   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
568   int recv_size = parse_double(action[2 + comm_size]);
569   std::vector<int> disps(comm_size, 0);
570   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
571
572   MPI_Datatype MPI_CURRENT_TYPE =
573       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
574   MPI_Datatype MPI_CURRENT_TYPE2{
575       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
576
577   void* send = nullptr;
578   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
579   for (unsigned int i = 0; i < comm_size; i++) {
580     (*sendcounts)[i] = std::stoi(action[i + 2]);
581   }
582   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
583
584   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
585   int rank = MPI_COMM_WORLD->rank();
586
587   if (rank == root)
588     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
589
590   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
591                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
592                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
593
594   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
595                   MPI_COMM_WORLD);
596
597   TRACE_smpi_comm_out(Actor::self()->getPid());
598   log_timed_action(action, clock);
599 }
600
601 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
602 {
603   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
604        0 reduceScatter 275427 275427 275427 204020 11346849 0
605      where:
606        1) The first four values after the name of the action declare the recvcounts array
607        2) The value 11346849 is the amount of instructions
608        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
609  */
610   double clock = smpi_process()->simulated_elapsed();
611   unsigned long comm_size = MPI_COMM_WORLD->size();
612   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
613   int comp_size = parse_double(action[2+comm_size]);
614   int my_proc_id                     = Actor::self()->getPid();
615   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
616   MPI_Datatype MPI_CURRENT_TYPE =
617       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
618
619   for (unsigned int i = 0; i < comm_size; i++) {
620     recvcounts->push_back(std::stoi(action[i + 2]));
621   }
622   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
623
624   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
625                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
626                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
627                                                        Datatype::encode(MPI_CURRENT_TYPE)));
628
629   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
630   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
631
632   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
633   smpi_execute_flops(comp_size);
634
635   TRACE_smpi_comm_out(my_proc_id);
636   log_timed_action (action, clock);
637 }
638
639 static void action_allgather(simgrid::xbt::ReplayAction& action)
640 {
641   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
642         0 allGather 275427 275427
643     where:
644         1) 275427 is the sendcount
645         2) 275427 is the recvcount
646         3) No more values mean that the datatype for sent and receive buffer is the default one, see
647     simgrid::smpi::Datatype::decode().
648   */
649   double clock = smpi_process()->simulated_elapsed();
650
651   CHECK_ACTION_PARAMS(action, 2, 2)
652   int sendcount = std::stoi(action[2]);
653   int recvcount = std::stoi(action[3]);
654
655   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
656   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
657
658   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
659   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
660
661   int my_proc_id = Actor::self()->getPid();
662
663   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
664                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
665                                                     Datatype::encode(MPI_CURRENT_TYPE),
666                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
667
668   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
669
670   TRACE_smpi_comm_out(my_proc_id);
671   log_timed_action (action, clock);
672 }
673
674 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
675 {
676   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
677         0 allGatherV 275427 275427 275427 275427 204020
678      where:
679         1) 275427 is the sendcount
680         2) The next four elements declare the recvcounts array
681         3) No more values mean that the datatype for sent and receive buffer is the default one, see
682      simgrid::smpi::Datatype::decode().
683   */
684   double clock = smpi_process()->simulated_elapsed();
685
686   unsigned long comm_size = MPI_COMM_WORLD->size();
687   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
688   int sendcount = std::stoi(action[2]);
689   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
690   std::vector<int> disps(comm_size, 0);
691
692   int datatype_index = 0, disp_index = 0;
693   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
694     datatype_index = 3 + comm_size;
695     disp_index     = datatype_index + 1;
696   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
697     datatype_index = -1;
698     disp_index     = 3 + comm_size;
699   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
700     datatype_index = 3 + comm_size;
701   }
702
703   if (disp_index != 0) {
704     for (unsigned int i = 0; i < comm_size; i++)
705       disps[i]          = std::stoi(action[disp_index + i]);
706   }
707
708   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
709                                                      : MPI_DEFAULT_TYPE};
710   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
711                                                       : MPI_DEFAULT_TYPE};
712
713   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
714
715   for (unsigned int i = 0; i < comm_size; i++) {
716     (*recvcounts)[i] = std::stoi(action[i + 3]);
717   }
718   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
719   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
720
721   int my_proc_id = Actor::self()->getPid();
722
723   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
724                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
725                                                        Datatype::encode(MPI_CURRENT_TYPE),
726                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
727
728   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
729                     MPI_COMM_WORLD);
730
731   TRACE_smpi_comm_out(my_proc_id);
732   log_timed_action (action, clock);
733 }
734
735 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
736 {
737   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
738         0 allToAllV 100 1 7 10 12 100 1 70 10 5
739      where:
740         1) 100 is the size of the send buffer *sizeof(int),
741         2) 1 7 10 12 is the sendcounts array
742         3) 100*sizeof(int) is the size of the receiver buffer
743         4)  1 70 10 5 is the recvcounts array
744   */
745   double clock = smpi_process()->simulated_elapsed();
746
747   unsigned long comm_size = MPI_COMM_WORLD->size();
748   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
749   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
750   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
751   std::vector<int> senddisps(comm_size, 0);
752   std::vector<int> recvdisps(comm_size, 0);
753
754   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
755                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
756                                       : MPI_DEFAULT_TYPE;
757   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
758                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
759                                      : MPI_DEFAULT_TYPE};
760
761   int send_buf_size=parse_double(action[2]);
762   int recv_buf_size=parse_double(action[3+comm_size]);
763   int my_proc_id = Actor::self()->getPid();
764   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
765   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
766
767   for (unsigned int i = 0; i < comm_size; i++) {
768     (*sendcounts)[i] = std::stoi(action[3 + i]);
769     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
770   }
771   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
772   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
773
774   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
775                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
776                                                        Datatype::encode(MPI_CURRENT_TYPE),
777                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
778
779   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
780                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
781
782   TRACE_smpi_comm_out(my_proc_id);
783   log_timed_action (action, clock);
784 }
785
786 }} // namespace simgrid::smpi
787
788 /** @brief Only initialize the replay, don't do it for real */
789 void smpi_replay_init(int* argc, char*** argv)
790 {
791   simgrid::smpi::Process::init(argc, argv);
792   smpi_process()->mark_as_initialized();
793   smpi_process()->set_replaying(true);
794
795   int my_proc_id = Actor::self()->getPid();
796   TRACE_smpi_init(my_proc_id);
797   TRACE_smpi_computing_init(my_proc_id);
798   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
799   TRACE_smpi_comm_out(my_proc_id);
800   xbt_replay_action_register("init",       simgrid::smpi::action_init);
801   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
802   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
803   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
804   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
805
806   std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
807   std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
808   std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
809   std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
810   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
811
812   xbt_replay_action_register("send",
813                              std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
814   xbt_replay_action_register("Isend",
815                              std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
816   xbt_replay_action_register("recv",
817                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
818   xbt_replay_action_register("Irecv",
819                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
820   xbt_replay_action_register("test", simgrid::smpi::action_test);
821   xbt_replay_action_register("wait",
822                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
823   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
824   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
825   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
826   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
827   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
828   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
829   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
830   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
831   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
832   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
833   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
834   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
835   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
836   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
837   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
838
839   //if we have a delayed start, sleep here.
840   if(*argc>2){
841     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
842     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
843     smpi_execute_flops(value);
844   } else {
845     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
846     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
847     smpi_execute_flops(0.0);
848   }
849 }
850
851 /** @brief actually run the replay after initialization */
852 void smpi_replay_main(int* argc, char*** argv)
853 {
854   simgrid::xbt::replay_runner(*argc, *argv);
855
856   /* and now, finalize everything */
857   /* One active process will stop. Decrease the counter*/
858   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
859   if (not get_reqq_self()->empty()) {
860     unsigned int count_requests=get_reqq_self()->size();
861     MPI_Request requests[count_requests];
862     MPI_Status status[count_requests];
863     unsigned int i=0;
864
865     for (auto const& req : *get_reqq_self()) {
866       requests[i] = req;
867       i++;
868     }
869     simgrid::smpi::Request::waitall(count_requests, requests, status);
870   }
871   delete get_reqq_self();
872   active_processes--;
873
874   if(active_processes==0){
875     /* Last process alive speaking: end the simulated timer */
876     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
877     smpi_free_replay_tmp_buffers();
878   }
879
880   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
881
882   smpi_process()->finalize();
883
884   TRACE_smpi_comm_out(Actor::self()->getPid());
885   TRACE_smpi_finalize(Actor::self()->getPid());
886 }
887
888 /** @brief chain a replay initialization and a replay start */
889 void smpi_replay_run(int* argc, char*** argv)
890 {
891   smpi_replay_init(argc, argv);
892   smpi_replay_main(argc, argv);
893 }