Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4417d46922c07fc16a7381597ed26993cd265e3b
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class CollCommParser : public ActionArgParser {
104 public:
105   double size;
106   double comm_size;
107   double comp_size;
108   int send_size;
109   int recv_size;
110   int root = 0;
111   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
113 };
114
115 class BcastArgParser : public CollCommParser {
116 public:
117   void parse(simgrid::xbt::ReplayAction& action) override
118   {
119     CHECK_ACTION_PARAMS(action, 1, 2)
120     size = parse_double(action[2]);
121     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122     if (action.size() > 4)
123       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
124   }
125 };
126
127 class ReduceArgParser : public CollCommParser {
128 public:
129   void parse(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 2, 2)
132     comm_size = parse_double(action[2]);
133     comp_size = parse_double(action[3]);
134     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
135     if (action.size() > 5)
136       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
137   }
138 };
139
140 class AllReduceArgParser : public CollCommParser {
141 public:
142   void parse(simgrid::xbt::ReplayAction& action) override
143   {
144     CHECK_ACTION_PARAMS(action, 2, 1)
145     comm_size = parse_double(action[2]);
146     comp_size = parse_double(action[3]);
147     if (action.size() > 4)
148       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
149   }
150 };
151
152 class AllToAllArgParser : public CollCommParser {
153 public:
154   void parse(simgrid::xbt::ReplayAction& action) override
155   {
156     CHECK_ACTION_PARAMS(action, 2, 1)
157     comm_size = MPI_COMM_WORLD->size();
158     send_size = parse_double(action[2]);
159     recv_size = parse_double(action[3]);
160
161     if (action.size() > 4)
162       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163     if (action.size() > 5)
164       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
165   }
166 };
167
168 template <class T> class ReplayAction {
169 protected:
170   const std::string name;
171   T args;
172
173   int my_proc_id;
174
175 public:
176   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
177
178   virtual void execute(simgrid::xbt::ReplayAction& action)
179   {
180     // Needs to be re-initialized for every action, hence here
181     double start_time = smpi_process()->simulated_elapsed();
182     args.parse(action);
183     kernel(action);
184     if (name != "Init")
185       log_timed_action(action, start_time);
186   }
187
188   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
189
190   void* send_buffer(int size)
191   {
192     return smpi_get_tmp_sendbuffer(size);
193   }
194
195   void* recv_buffer(int size)
196   {
197     return smpi_get_tmp_recvbuffer(size);
198   }
199 };
200
201 class WaitAction : public ReplayAction<ActionArgParser> {
202 public:
203   WaitAction() : ReplayAction("Wait") {}
204   void kernel(simgrid::xbt::ReplayAction& action) override
205   {
206     std::string s = boost::algorithm::join(action, " ");
207     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
208     MPI_Request request = get_reqq_self()->back();
209     get_reqq_self()->pop_back();
210
211     if (request == nullptr) {
212       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
213        * return.*/
214       return;
215     }
216
217     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
218
219     // Must be taken before Request::wait() since the request may be set to
220     // MPI_REQUEST_NULL by Request::wait!
221     int src                  = request->comm()->group()->rank(request->src());
222     int dst                  = request->comm()->group()->rank(request->dst());
223     bool is_wait_for_receive = (request->flags() & RECV);
224     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
225     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
226
227     MPI_Status status;
228     Request::wait(&request, &status);
229
230     TRACE_smpi_comm_out(rank);
231     if (is_wait_for_receive)
232       TRACE_smpi_recv(src, dst, 0);
233   }
234 };
235
236 class SendAction : public ReplayAction<SendRecvParser> {
237 public:
238   SendAction() = delete;
239   SendAction(std::string name) : ReplayAction(name) {}
240   void kernel(simgrid::xbt::ReplayAction& action) override
241   {
242     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
243
244     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
245                                                                                  Datatype::encode(args.datatype1)));
246     if (not TRACE_smpi_view_internals())
247       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
248
249     if (name == "send") {
250       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
251     } else if (name == "Isend") {
252       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
253       get_reqq_self()->push_back(request);
254     } else {
255       xbt_die("Don't know this action, %s", name.c_str());
256     }
257
258     TRACE_smpi_comm_out(my_proc_id);
259   }
260 };
261
262 class RecvAction : public ReplayAction<SendRecvParser> {
263 public:
264   RecvAction() = delete;
265   explicit RecvAction(std::string name) : ReplayAction(name) {}
266   void kernel(simgrid::xbt::ReplayAction& action) override
267   {
268     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
269
270     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
271                                                                                  Datatype::encode(args.datatype1)));
272
273     MPI_Status status;
274     // unknown size from the receiver point of view
275     if (args.size <= 0.0) {
276       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
277       args.size = status.count;
278     }
279
280     if (name == "recv") {
281       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
282     } else if (name == "Irecv") {
283       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
284       get_reqq_self()->push_back(request);
285     }
286
287     TRACE_smpi_comm_out(my_proc_id);
288     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
289     if (name == "recv" && not TRACE_smpi_view_internals()) {
290       TRACE_smpi_recv(src_traced, my_proc_id, 0);
291     }
292   }
293 };
294
295 class ComputeAction : public ReplayAction<ComputeParser> {
296 public:
297   ComputeAction() : ReplayAction("compute") {}
298   void kernel(simgrid::xbt::ReplayAction& action) override
299   {
300     TRACE_smpi_computing_in(my_proc_id, args.flops);
301     smpi_execute_flops(args.flops);
302     TRACE_smpi_computing_out(my_proc_id);
303   }
304 };
305
306 class TestAction : public ReplayAction<ActionArgParser> {
307 public:
308   TestAction() : ReplayAction("Test") {}
309   void kernel(simgrid::xbt::ReplayAction& action) override
310   {
311     MPI_Request request = get_reqq_self()->back();
312     get_reqq_self()->pop_back();
313     // if request is null here, this may mean that a previous test has succeeded
314     // Different times in traced application and replayed version may lead to this
315     // In this case, ignore the extra calls.
316     if (request != nullptr) {
317       TRACE_smpi_testing_in(my_proc_id);
318
319       MPI_Status status;
320       int flag = Request::test(&request, &status);
321
322       XBT_DEBUG("MPI_Test result: %d", flag);
323       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
324        * nullptr.*/
325       get_reqq_self()->push_back(request);
326
327       TRACE_smpi_testing_out(my_proc_id);
328     }
329   }
330 };
331
332 class InitAction : public ReplayAction<ActionArgParser> {
333 public:
334   InitAction() : ReplayAction("Init") {}
335   void kernel(simgrid::xbt::ReplayAction& action) override
336   {
337     CHECK_ACTION_PARAMS(action, 0, 1)
338     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
339                                            : MPI_BYTE;  // default TAU datatype
340
341     /* start a simulated timer */
342     smpi_process()->simulated_start();
343     /*initialize the number of active processes */
344     active_processes = smpi_process_count();
345
346     set_reqq_self(new std::vector<MPI_Request>);
347   }
348 };
349
350 class CommunicatorAction : public ReplayAction<ActionArgParser> {
351 public:
352   CommunicatorAction() : ReplayAction("Comm") {}
353   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
354 };
355
356 class WaitAllAction : public ReplayAction<ActionArgParser> {
357 public:
358   WaitAllAction() : ReplayAction("waitAll") {}
359   void kernel(simgrid::xbt::ReplayAction& action) override
360   {
361     const unsigned int count_requests = get_reqq_self()->size();
362
363     if (count_requests > 0) {
364       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
365                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
366       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
367       for (const auto& req : (*get_reqq_self())) {
368         if (req && (req->flags() & RECV)) {
369           sender_receiver.push_back({req->src(), req->dst()});
370         }
371       }
372       MPI_Status status[count_requests];
373       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
374
375       for (auto& pair : sender_receiver) {
376         TRACE_smpi_recv(pair.first, pair.second, 0);
377       }
378       TRACE_smpi_comm_out(my_proc_id);
379     }
380   }
381 };
382
383 class BarrierAction : public ReplayAction<ActionArgParser> {
384 public:
385   BarrierAction() : ReplayAction("barrier") {}
386   void kernel(simgrid::xbt::ReplayAction& action) override
387   {
388     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
389     Colls::barrier(MPI_COMM_WORLD);
390     TRACE_smpi_comm_out(my_proc_id);
391   }
392 };
393
394 class BcastAction : public ReplayAction<BcastArgParser> {
395 public:
396   BcastAction() : ReplayAction("bcast") {}
397   void kernel(simgrid::xbt::ReplayAction& action) override
398   {
399     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
400                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), 
401                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
402
403     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
404
405     TRACE_smpi_comm_out(my_proc_id);
406   }
407 };
408
409 class ReduceAction : public ReplayAction<ReduceArgParser> {
410 public:
411   ReduceAction() : ReplayAction("reduce") {}
412   void kernel(simgrid::xbt::ReplayAction& action) override
413   {
414     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
415                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
416                                                       args.comm_size, -1, Datatype::encode(args.datatype1), ""));
417
418     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
419         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
420     smpi_execute_flops(args.comp_size);
421
422     TRACE_smpi_comm_out(my_proc_id);
423   }
424 };
425
426 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
427 public:
428   AllReduceAction() : ReplayAction("allReduce") {}
429   void kernel(simgrid::xbt::ReplayAction& action) override
430   {
431     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
432                                                                                 Datatype::encode(args.datatype1), ""));
433
434     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
435         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
436     smpi_execute_flops(args.comp_size);
437
438     TRACE_smpi_comm_out(my_proc_id);
439   }
440 };
441
442 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
443 public:
444   AllToAllAction() : ReplayAction("allToAll") {}
445   void kernel(simgrid::xbt::ReplayAction& action) override
446   {
447     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
448                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
449                                                     Datatype::encode(args.datatype1),
450                                                     Datatype::encode(args.datatype2)));
451
452     Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()), 
453       args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
454       args.recv_size, args.datatype2, MPI_COMM_WORLD);
455
456     TRACE_smpi_comm_out(my_proc_id);
457   }
458 };
459 } // Replay Namespace
460
461 static void action_gather(simgrid::xbt::ReplayAction& action)
462 {
463   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
464         0 gather 68 68 0 0 0
465       where:
466         1) 68 is the sendcounts
467         2) 68 is the recvcounts
468         3) 0 is the root node
469         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
470         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
471   */
472   CHECK_ACTION_PARAMS(action, 2, 3)
473   double clock = smpi_process()->simulated_elapsed();
474   unsigned long comm_size = MPI_COMM_WORLD->size();
475   int send_size = parse_double(action[2]);
476   int recv_size = parse_double(action[3]);
477   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
478   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
479
480   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
481   void *recv = nullptr;
482   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
483   int rank = MPI_COMM_WORLD->rank();
484
485   if(rank==root)
486     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
487
488   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
489                                                                         Datatype::encode(MPI_CURRENT_TYPE),
490                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
491
492   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
493
494   TRACE_smpi_comm_out(Actor::self()->getPid());
495   log_timed_action (action, clock);
496 }
497
498 static void action_scatter(simgrid::xbt::ReplayAction& action)
499 {
500   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
501         0 gather 68 68 0 0 0
502       where:
503         1) 68 is the sendcounts
504         2) 68 is the recvcounts
505         3) 0 is the root node
506         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
507         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
508   */
509   CHECK_ACTION_PARAMS(action, 2, 3)
510   double clock                   = smpi_process()->simulated_elapsed();
511   unsigned long comm_size        = MPI_COMM_WORLD->size();
512   int send_size                  = parse_double(action[2]);
513   int recv_size                  = parse_double(action[3]);
514   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
515   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
516
517   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
518   void* recv = nullptr;
519   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
520   int rank = MPI_COMM_WORLD->rank();
521
522   if (rank == root)
523     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
524
525   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
526                                                                         Datatype::encode(MPI_CURRENT_TYPE),
527                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
528
529   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
530
531   TRACE_smpi_comm_out(Actor::self()->getPid());
532   log_timed_action(action, clock);
533 }
534
535 static void action_gatherv(simgrid::xbt::ReplayAction& action)
536 {
537   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
538        0 gather 68 68 10 10 10 0 0 0
539      where:
540        1) 68 is the sendcount
541        2) 68 10 10 10 is the recvcounts
542        3) 0 is the root node
543        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
544        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
545   */
546   double clock = smpi_process()->simulated_elapsed();
547   unsigned long comm_size = MPI_COMM_WORLD->size();
548   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
549   int send_size = parse_double(action[2]);
550   std::vector<int> disps(comm_size, 0);
551   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
552
553   MPI_Datatype MPI_CURRENT_TYPE =
554       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
555   MPI_Datatype MPI_CURRENT_TYPE2{
556       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
557
558   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
559   void *recv = nullptr;
560   for (unsigned int i = 0; i < comm_size; i++) {
561     (*recvcounts)[i] = std::stoi(action[i + 3]);
562   }
563   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
564
565   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
566   int rank = MPI_COMM_WORLD->rank();
567
568   if(rank==root)
569     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
572                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
573                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
574
575   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
576                  MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action (action, clock);
580 }
581
582 static void action_scatterv(simgrid::xbt::ReplayAction& action)
583 {
584   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
585        0 gather 68 10 10 10 68 0 0 0
586      where:
587        1) 68 10 10 10 is the sendcounts
588        2) 68 is the recvcount
589        3) 0 is the root node
590        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
591        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
592   */
593   double clock  = smpi_process()->simulated_elapsed();
594   unsigned long comm_size = MPI_COMM_WORLD->size();
595   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
596   int recv_size = parse_double(action[2 + comm_size]);
597   std::vector<int> disps(comm_size, 0);
598   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
599
600   MPI_Datatype MPI_CURRENT_TYPE =
601       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602   MPI_Datatype MPI_CURRENT_TYPE2{
603       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604
605   void* send = nullptr;
606   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
607   for (unsigned int i = 0; i < comm_size; i++) {
608     (*sendcounts)[i] = std::stoi(action[i + 2]);
609   }
610   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
611
612   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
613   int rank = MPI_COMM_WORLD->rank();
614
615   if (rank == root)
616     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
617
618   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
619                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
620                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
621
622   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
623                   MPI_COMM_WORLD);
624
625   TRACE_smpi_comm_out(Actor::self()->getPid());
626   log_timed_action(action, clock);
627 }
628
629 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
630 {
631   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
632        0 reduceScatter 275427 275427 275427 204020 11346849 0
633      where:
634        1) The first four values after the name of the action declare the recvcounts array
635        2) The value 11346849 is the amount of instructions
636        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
637  */
638   double clock = smpi_process()->simulated_elapsed();
639   unsigned long comm_size = MPI_COMM_WORLD->size();
640   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
641   int comp_size = parse_double(action[2+comm_size]);
642   int my_proc_id                     = Actor::self()->getPid();
643   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
644   MPI_Datatype MPI_CURRENT_TYPE =
645       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
646
647   for (unsigned int i = 0; i < comm_size; i++) {
648     recvcounts->push_back(std::stoi(action[i + 2]));
649   }
650   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
651
652   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
653                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
654                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
655                                                        Datatype::encode(MPI_CURRENT_TYPE)));
656
657   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
658   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
659
660   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
661   smpi_execute_flops(comp_size);
662
663   TRACE_smpi_comm_out(my_proc_id);
664   log_timed_action (action, clock);
665 }
666
667 static void action_allgather(simgrid::xbt::ReplayAction& action)
668 {
669   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
670         0 allGather 275427 275427
671     where:
672         1) 275427 is the sendcount
673         2) 275427 is the recvcount
674         3) No more values mean that the datatype for sent and receive buffer is the default one, see
675     simgrid::smpi::Datatype::decode().
676   */
677   double clock = smpi_process()->simulated_elapsed();
678
679   CHECK_ACTION_PARAMS(action, 2, 2)
680   int sendcount = std::stoi(action[2]);
681   int recvcount = std::stoi(action[3]);
682
683   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
684   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
685
686   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
687   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
688
689   int my_proc_id = Actor::self()->getPid();
690
691   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
692                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
693                                                     Datatype::encode(MPI_CURRENT_TYPE),
694                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
695
696   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
697
698   TRACE_smpi_comm_out(my_proc_id);
699   log_timed_action (action, clock);
700 }
701
702 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
703 {
704   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
705         0 allGatherV 275427 275427 275427 275427 204020
706      where:
707         1) 275427 is the sendcount
708         2) The next four elements declare the recvcounts array
709         3) No more values mean that the datatype for sent and receive buffer is the default one, see
710      simgrid::smpi::Datatype::decode().
711   */
712   double clock = smpi_process()->simulated_elapsed();
713
714   unsigned long comm_size = MPI_COMM_WORLD->size();
715   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
716   int sendcount = std::stoi(action[2]);
717   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
718   std::vector<int> disps(comm_size, 0);
719
720   int datatype_index = 0, disp_index = 0;
721   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
722     datatype_index = 3 + comm_size;
723     disp_index     = datatype_index + 1;
724   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
725     datatype_index = -1;
726     disp_index     = 3 + comm_size;
727   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
728     datatype_index = 3 + comm_size;
729   }
730
731   if (disp_index != 0) {
732     for (unsigned int i = 0; i < comm_size; i++)
733       disps[i]          = std::stoi(action[disp_index + i]);
734   }
735
736   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
737                                                      : MPI_DEFAULT_TYPE};
738   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
739                                                       : MPI_DEFAULT_TYPE};
740
741   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
742
743   for (unsigned int i = 0; i < comm_size; i++) {
744     (*recvcounts)[i] = std::stoi(action[i + 3]);
745   }
746   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
747   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
748
749   int my_proc_id = Actor::self()->getPid();
750
751   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
752                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
753                                                        Datatype::encode(MPI_CURRENT_TYPE),
754                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
755
756   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
757                     MPI_COMM_WORLD);
758
759   TRACE_smpi_comm_out(my_proc_id);
760   log_timed_action (action, clock);
761 }
762
763 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
764 {
765   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
766         0 allToAllV 100 1 7 10 12 100 1 70 10 5
767      where:
768         1) 100 is the size of the send buffer *sizeof(int),
769         2) 1 7 10 12 is the sendcounts array
770         3) 100*sizeof(int) is the size of the receiver buffer
771         4)  1 70 10 5 is the recvcounts array
772   */
773   double clock = smpi_process()->simulated_elapsed();
774
775   unsigned long comm_size = MPI_COMM_WORLD->size();
776   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
777   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
778   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
779   std::vector<int> senddisps(comm_size, 0);
780   std::vector<int> recvdisps(comm_size, 0);
781
782   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
783                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
784                                       : MPI_DEFAULT_TYPE;
785   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
786                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
787                                      : MPI_DEFAULT_TYPE};
788
789   int send_buf_size=parse_double(action[2]);
790   int recv_buf_size=parse_double(action[3+comm_size]);
791   int my_proc_id = Actor::self()->getPid();
792   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
793   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
794
795   for (unsigned int i = 0; i < comm_size; i++) {
796     (*sendcounts)[i] = std::stoi(action[3 + i]);
797     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
798   }
799   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
800   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
801
802   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
803                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
804                                                        Datatype::encode(MPI_CURRENT_TYPE),
805                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
806
807   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
808                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
809
810   TRACE_smpi_comm_out(my_proc_id);
811   log_timed_action (action, clock);
812 }
813
814 }} // namespace simgrid::smpi
815
816 /** @brief Only initialize the replay, don't do it for real */
817 void smpi_replay_init(int* argc, char*** argv)
818 {
819   simgrid::smpi::Process::init(argc, argv);
820   smpi_process()->mark_as_initialized();
821   smpi_process()->set_replaying(true);
822
823   int my_proc_id = Actor::self()->getPid();
824   TRACE_smpi_init(my_proc_id);
825   TRACE_smpi_computing_init(my_proc_id);
826   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
827   TRACE_smpi_comm_out(my_proc_id);
828   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
829   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
830   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
831   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
832   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
833
834   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
835   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
836   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
837   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
838   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
839   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
840   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
841   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
842   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
843   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
844   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
845   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
846   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
847   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
848   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
849   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
850   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
851   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
852   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
853   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
854   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
855
856   //if we have a delayed start, sleep here.
857   if(*argc>2){
858     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
859     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
860     smpi_execute_flops(value);
861   } else {
862     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
863     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
864     smpi_execute_flops(0.0);
865   }
866 }
867
868 /** @brief actually run the replay after initialization */
869 void smpi_replay_main(int* argc, char*** argv)
870 {
871   simgrid::xbt::replay_runner(*argc, *argv);
872
873   /* and now, finalize everything */
874   /* One active process will stop. Decrease the counter*/
875   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
876   if (not get_reqq_self()->empty()) {
877     unsigned int count_requests=get_reqq_self()->size();
878     MPI_Request requests[count_requests];
879     MPI_Status status[count_requests];
880     unsigned int i=0;
881
882     for (auto const& req : *get_reqq_self()) {
883       requests[i] = req;
884       i++;
885     }
886     simgrid::smpi::Request::waitall(count_requests, requests, status);
887   }
888   delete get_reqq_self();
889   active_processes--;
890
891   if(active_processes==0){
892     /* Last process alive speaking: end the simulated timer */
893     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894     smpi_free_replay_tmp_buffers();
895   }
896
897   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
898
899   smpi_process()->finalize();
900
901   TRACE_smpi_comm_out(Actor::self()->getPid());
902   TRACE_smpi_finalize(Actor::self()->getPid());
903 }
904
905 /** @brief chain a replay initialization and a replay start */
906 void smpi_replay_run(int* argc, char*** argv)
907 {
908   smpi_replay_init(argc, argv);
909   smpi_replay_main(argc, argv);
910 }