Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Use correct buffer (recvbuffer)
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 class BcastArgParser : public ActionArgParser {
104 public:
105   double size;
106   int root;
107   MPI_Datatype datatype = MPI_DEFAULT_TYPE;
108   void parse(simgrid::xbt::ReplayAction& action) override
109   {
110     CHECK_ACTION_PARAMS(action, 1, 2)
111     size = parse_double(action[2]);
112     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
113     if (action.size() > 4)
114       datatype = simgrid::smpi::Datatype::decode(action[4]);
115   }
116 };
117
118 template <class T> class ReplayAction {
119 protected:
120   const std::string name;
121   T args;
122
123   int my_proc_id;
124
125 public:
126   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
127
128   virtual void execute(simgrid::xbt::ReplayAction& action)
129   {
130     // Needs to be re-initialized for every action, hence here
131     double start_time = smpi_process()->simulated_elapsed();
132     args.parse(action);
133     kernel(action);
134     if (name != "Init")
135       log_timed_action(action, start_time);
136   }
137
138   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
139 };
140
141 class WaitAction : public ReplayAction<ActionArgParser> {
142 public:
143   WaitAction() : ReplayAction("Wait") {}
144   void kernel(simgrid::xbt::ReplayAction& action) override
145   {
146     std::string s = boost::algorithm::join(action, " ");
147     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
148     MPI_Request request = get_reqq_self()->back();
149     get_reqq_self()->pop_back();
150
151     if (request == nullptr) {
152       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
153        * return.*/
154       return;
155     }
156
157     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
158
159     // Must be taken before Request::wait() since the request may be set to
160     // MPI_REQUEST_NULL by Request::wait!
161     int src                  = request->comm()->group()->rank(request->src());
162     int dst                  = request->comm()->group()->rank(request->dst());
163     bool is_wait_for_receive = (request->flags() & RECV);
164     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
165     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
166
167     MPI_Status status;
168     Request::wait(&request, &status);
169
170     TRACE_smpi_comm_out(rank);
171     if (is_wait_for_receive)
172       TRACE_smpi_recv(src, dst, 0);
173   }
174 };
175
176 class SendAction : public ReplayAction<SendRecvParser> {
177 public:
178   SendAction() = delete;
179   SendAction(std::string name) : ReplayAction(name) {}
180   void kernel(simgrid::xbt::ReplayAction& action) override
181   {
182     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
183
184     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
185                                                                                  Datatype::encode(args.datatype1)));
186     if (not TRACE_smpi_view_internals())
187       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
188
189     if (name == "send") {
190       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
191     } else if (name == "Isend") {
192       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
193       get_reqq_self()->push_back(request);
194     } else {
195       xbt_die("Don't know this action, %s", name.c_str());
196     }
197
198     TRACE_smpi_comm_out(my_proc_id);
199   }
200 };
201
202 class RecvAction : public ReplayAction<SendRecvParser> {
203 public:
204   RecvAction() = delete;
205   explicit RecvAction(std::string name) : ReplayAction(name) {}
206   void kernel(simgrid::xbt::ReplayAction& action) override
207   {
208     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
209
210     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
211                                                                                  Datatype::encode(args.datatype1)));
212
213     MPI_Status status;
214     // unknown size from the receiver point of view
215     if (args.size <= 0.0) {
216       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
217       args.size = status.count;
218     }
219
220     if (name == "recv") {
221       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
222     } else if (name == "Irecv") {
223       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
224       get_reqq_self()->push_back(request);
225     }
226
227     TRACE_smpi_comm_out(my_proc_id);
228     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
229     if (name == "recv" && not TRACE_smpi_view_internals()) {
230       TRACE_smpi_recv(src_traced, my_proc_id, 0);
231     }
232   }
233 };
234
235 class ComputeAction : public ReplayAction<ComputeParser> {
236 public:
237   ComputeAction() : ReplayAction("compute") {}
238   void kernel(simgrid::xbt::ReplayAction& action) override
239   {
240     TRACE_smpi_computing_in(my_proc_id, args.flops);
241     smpi_execute_flops(args.flops);
242     TRACE_smpi_computing_out(my_proc_id);
243   }
244 };
245
246 class TestAction : public ReplayAction<ActionArgParser> {
247 public:
248   TestAction() : ReplayAction("Test") {}
249   void kernel(simgrid::xbt::ReplayAction& action) override
250   {
251     MPI_Request request = get_reqq_self()->back();
252     get_reqq_self()->pop_back();
253     // if request is null here, this may mean that a previous test has succeeded
254     // Different times in traced application and replayed version may lead to this
255     // In this case, ignore the extra calls.
256     if (request != nullptr) {
257       TRACE_smpi_testing_in(my_proc_id);
258
259       MPI_Status status;
260       int flag = Request::test(&request, &status);
261
262       XBT_DEBUG("MPI_Test result: %d", flag);
263       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
264        * nullptr.*/
265       get_reqq_self()->push_back(request);
266
267       TRACE_smpi_testing_out(my_proc_id);
268     }
269   }
270 };
271
272 class InitAction : public ReplayAction<ActionArgParser> {
273 public:
274   InitAction() : ReplayAction("Init") {}
275   void kernel(simgrid::xbt::ReplayAction& action) override
276   {
277     CHECK_ACTION_PARAMS(action, 0, 1)
278     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
279                                            : MPI_BYTE;  // default TAU datatype
280
281     /* start a simulated timer */
282     smpi_process()->simulated_start();
283     /*initialize the number of active processes */
284     active_processes = smpi_process_count();
285
286     set_reqq_self(new std::vector<MPI_Request>);
287   }
288 };
289
290 class CommunicatorAction : public ReplayAction<ActionArgParser> {
291 public:
292   CommunicatorAction() : ReplayAction("Comm") {}
293   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
294 };
295
296 class WaitAllAction : public ReplayAction<ActionArgParser> {
297 public:
298   WaitAllAction() : ReplayAction("waitAll") {}
299   void kernel(simgrid::xbt::ReplayAction& action) override
300   {
301     const unsigned int count_requests = get_reqq_self()->size();
302
303     if (count_requests > 0) {
304       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
305                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
306       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
307       for (const auto& req : (*get_reqq_self())) {
308         if (req && (req->flags() & RECV)) {
309           sender_receiver.push_back({req->src(), req->dst()});
310         }
311       }
312       MPI_Status status[count_requests];
313       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
314
315       for (auto& pair : sender_receiver) {
316         TRACE_smpi_recv(pair.first, pair.second, 0);
317       }
318       TRACE_smpi_comm_out(my_proc_id);
319     }
320   }
321 };
322
323 class BarrierAction : public ReplayAction<ActionArgParser> {
324 public:
325   BarrierAction() : ReplayAction("barrier") {}
326   void kernel(simgrid::xbt::ReplayAction& action) override
327   {
328     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
329     Colls::barrier(MPI_COMM_WORLD);
330     TRACE_smpi_comm_out(my_proc_id);
331   }
332 };
333
334 class BcastAction : public ReplayAction<BcastArgParser> {
335 public:
336   BcastAction() : ReplayAction("bcast") {}
337   void kernel(simgrid::xbt::ReplayAction& action) override
338   {
339     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
340                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), 
341                                                       -1.0, args.size, -1, Datatype::encode(args.datatype), ""));
342
343     void* sendbuf = smpi_get_tmp_sendbuffer(args.size * args.datatype->size());
344
345     Colls::bcast(sendbuf, args.size, args.datatype, args.root, MPI_COMM_WORLD);
346
347     TRACE_smpi_comm_out(my_proc_id);
348   }
349 };
350
351
352 static void action_reduce(simgrid::xbt::ReplayAction& action)
353 {
354   CHECK_ACTION_PARAMS(action, 2, 2)
355   double comm_size = parse_double(action[2]);
356   double comp_size = parse_double(action[3]);
357   double clock = smpi_process()->simulated_elapsed();
358   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
359
360   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
361
362   int my_proc_id = Actor::self()->getPid();
363   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
364                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
365                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
366
367   void* recvbuf = smpi_get_tmp_recvbuffer(comm_size * MPI_CURRENT_TYPE->size());
368   void* sendbuf = smpi_get_tmp_sendbuffer(comm_size * MPI_CURRENT_TYPE->size());
369   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
370   smpi_execute_flops(comp_size);
371
372   TRACE_smpi_comm_out(my_proc_id);
373   log_timed_action (action, clock);
374 }
375
376 static void action_allReduce(simgrid::xbt::ReplayAction& action)
377 {
378   CHECK_ACTION_PARAMS(action, 2, 1)
379   double comm_size = parse_double(action[2]);
380   double comp_size = parse_double(action[3]);
381
382   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
383
384   double clock = smpi_process()->simulated_elapsed();
385   int my_proc_id = Actor::self()->getPid();
386   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
387                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
388
389   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
390   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
391   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
392   smpi_execute_flops(comp_size);
393
394   TRACE_smpi_comm_out(my_proc_id);
395   log_timed_action (action, clock);
396 }
397
398 static void action_allToAll(simgrid::xbt::ReplayAction& action)
399 {
400   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
401   double clock = smpi_process()->simulated_elapsed();
402   unsigned long comm_size = MPI_COMM_WORLD->size();
403   int send_size = parse_double(action[2]);
404   int recv_size = parse_double(action[3]);
405   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
406   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
407
408   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
409   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
410
411   int my_proc_id = Actor::self()->getPid();
412   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
413                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
414                                                     Datatype::encode(MPI_CURRENT_TYPE),
415                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
416
417   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
418
419   TRACE_smpi_comm_out(my_proc_id);
420   log_timed_action (action, clock);
421 }
422
423 static void action_gather(simgrid::xbt::ReplayAction& action)
424 {
425   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
426         0 gather 68 68 0 0 0
427       where:
428         1) 68 is the sendcounts
429         2) 68 is the recvcounts
430         3) 0 is the root node
431         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
432         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
433   */
434   CHECK_ACTION_PARAMS(action, 2, 3)
435   double clock = smpi_process()->simulated_elapsed();
436   unsigned long comm_size = MPI_COMM_WORLD->size();
437   int send_size = parse_double(action[2]);
438   int recv_size = parse_double(action[3]);
439   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
440   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
441
442   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
443   void *recv = nullptr;
444   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
445   int rank = MPI_COMM_WORLD->rank();
446
447   if(rank==root)
448     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
449
450   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
451                                                                         Datatype::encode(MPI_CURRENT_TYPE),
452                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
453
454   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
455
456   TRACE_smpi_comm_out(Actor::self()->getPid());
457   log_timed_action (action, clock);
458 }
459
460 static void action_scatter(simgrid::xbt::ReplayAction& action)
461 {
462   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
463         0 gather 68 68 0 0 0
464       where:
465         1) 68 is the sendcounts
466         2) 68 is the recvcounts
467         3) 0 is the root node
468         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
469         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
470   */
471   CHECK_ACTION_PARAMS(action, 2, 3)
472   double clock                   = smpi_process()->simulated_elapsed();
473   unsigned long comm_size        = MPI_COMM_WORLD->size();
474   int send_size                  = parse_double(action[2]);
475   int recv_size                  = parse_double(action[3]);
476   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
477   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
478
479   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
480   void* recv = nullptr;
481   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
482   int rank = MPI_COMM_WORLD->rank();
483
484   if (rank == root)
485     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
486
487   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
488                                                                         Datatype::encode(MPI_CURRENT_TYPE),
489                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
490
491   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
492
493   TRACE_smpi_comm_out(Actor::self()->getPid());
494   log_timed_action(action, clock);
495 }
496
497 static void action_gatherv(simgrid::xbt::ReplayAction& action)
498 {
499   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
500        0 gather 68 68 10 10 10 0 0 0
501      where:
502        1) 68 is the sendcount
503        2) 68 10 10 10 is the recvcounts
504        3) 0 is the root node
505        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
506        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
507   */
508   double clock = smpi_process()->simulated_elapsed();
509   unsigned long comm_size = MPI_COMM_WORLD->size();
510   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
511   int send_size = parse_double(action[2]);
512   std::vector<int> disps(comm_size, 0);
513   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
514
515   MPI_Datatype MPI_CURRENT_TYPE =
516       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
517   MPI_Datatype MPI_CURRENT_TYPE2{
518       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
519
520   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
521   void *recv = nullptr;
522   for (unsigned int i = 0; i < comm_size; i++) {
523     (*recvcounts)[i] = std::stoi(action[i + 3]);
524   }
525   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
526
527   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
528   int rank = MPI_COMM_WORLD->rank();
529
530   if(rank==root)
531     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
532
533   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
534                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
535                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
536
537   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
538                  MPI_COMM_WORLD);
539
540   TRACE_smpi_comm_out(Actor::self()->getPid());
541   log_timed_action (action, clock);
542 }
543
544 static void action_scatterv(simgrid::xbt::ReplayAction& action)
545 {
546   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
547        0 gather 68 10 10 10 68 0 0 0
548      where:
549        1) 68 10 10 10 is the sendcounts
550        2) 68 is the recvcount
551        3) 0 is the root node
552        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
553        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
554   */
555   double clock  = smpi_process()->simulated_elapsed();
556   unsigned long comm_size = MPI_COMM_WORLD->size();
557   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
558   int recv_size = parse_double(action[2 + comm_size]);
559   std::vector<int> disps(comm_size, 0);
560   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
561
562   MPI_Datatype MPI_CURRENT_TYPE =
563       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
564   MPI_Datatype MPI_CURRENT_TYPE2{
565       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
566
567   void* send = nullptr;
568   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
569   for (unsigned int i = 0; i < comm_size; i++) {
570     (*sendcounts)[i] = std::stoi(action[i + 2]);
571   }
572   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
573
574   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
575   int rank = MPI_COMM_WORLD->rank();
576
577   if (rank == root)
578     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
579
580   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
581                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
582                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
583
584   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
585                   MPI_COMM_WORLD);
586
587   TRACE_smpi_comm_out(Actor::self()->getPid());
588   log_timed_action(action, clock);
589 }
590
591 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
592 {
593   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
594        0 reduceScatter 275427 275427 275427 204020 11346849 0
595      where:
596        1) The first four values after the name of the action declare the recvcounts array
597        2) The value 11346849 is the amount of instructions
598        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
599  */
600   double clock = smpi_process()->simulated_elapsed();
601   unsigned long comm_size = MPI_COMM_WORLD->size();
602   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
603   int comp_size = parse_double(action[2+comm_size]);
604   int my_proc_id                     = Actor::self()->getPid();
605   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
606   MPI_Datatype MPI_CURRENT_TYPE =
607       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
608
609   for (unsigned int i = 0; i < comm_size; i++) {
610     recvcounts->push_back(std::stoi(action[i + 2]));
611   }
612   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
613
614   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
615                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
616                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
617                                                        Datatype::encode(MPI_CURRENT_TYPE)));
618
619   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
620   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
621
622   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
623   smpi_execute_flops(comp_size);
624
625   TRACE_smpi_comm_out(my_proc_id);
626   log_timed_action (action, clock);
627 }
628
629 static void action_allgather(simgrid::xbt::ReplayAction& action)
630 {
631   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
632         0 allGather 275427 275427
633     where:
634         1) 275427 is the sendcount
635         2) 275427 is the recvcount
636         3) No more values mean that the datatype for sent and receive buffer is the default one, see
637     simgrid::smpi::Datatype::decode().
638   */
639   double clock = smpi_process()->simulated_elapsed();
640
641   CHECK_ACTION_PARAMS(action, 2, 2)
642   int sendcount = std::stoi(action[2]);
643   int recvcount = std::stoi(action[3]);
644
645   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
646   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
647
648   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
649   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
650
651   int my_proc_id = Actor::self()->getPid();
652
653   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
654                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
655                                                     Datatype::encode(MPI_CURRENT_TYPE),
656                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
657
658   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
659
660   TRACE_smpi_comm_out(my_proc_id);
661   log_timed_action (action, clock);
662 }
663
664 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
665 {
666   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
667         0 allGatherV 275427 275427 275427 275427 204020
668      where:
669         1) 275427 is the sendcount
670         2) The next four elements declare the recvcounts array
671         3) No more values mean that the datatype for sent and receive buffer is the default one, see
672      simgrid::smpi::Datatype::decode().
673   */
674   double clock = smpi_process()->simulated_elapsed();
675
676   unsigned long comm_size = MPI_COMM_WORLD->size();
677   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
678   int sendcount = std::stoi(action[2]);
679   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
680   std::vector<int> disps(comm_size, 0);
681
682   int datatype_index = 0, disp_index = 0;
683   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
684     datatype_index = 3 + comm_size;
685     disp_index     = datatype_index + 1;
686   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
687     datatype_index = -1;
688     disp_index     = 3 + comm_size;
689   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
690     datatype_index = 3 + comm_size;
691   }
692
693   if (disp_index != 0) {
694     for (unsigned int i = 0; i < comm_size; i++)
695       disps[i]          = std::stoi(action[disp_index + i]);
696   }
697
698   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
699                                                      : MPI_DEFAULT_TYPE};
700   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
701                                                       : MPI_DEFAULT_TYPE};
702
703   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
704
705   for (unsigned int i = 0; i < comm_size; i++) {
706     (*recvcounts)[i] = std::stoi(action[i + 3]);
707   }
708   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
709   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
710
711   int my_proc_id = Actor::self()->getPid();
712
713   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
714                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
715                                                        Datatype::encode(MPI_CURRENT_TYPE),
716                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
717
718   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
719                     MPI_COMM_WORLD);
720
721   TRACE_smpi_comm_out(my_proc_id);
722   log_timed_action (action, clock);
723 }
724
725 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
726 {
727   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
728         0 allToAllV 100 1 7 10 12 100 1 70 10 5
729      where:
730         1) 100 is the size of the send buffer *sizeof(int),
731         2) 1 7 10 12 is the sendcounts array
732         3) 100*sizeof(int) is the size of the receiver buffer
733         4)  1 70 10 5 is the recvcounts array
734   */
735   double clock = smpi_process()->simulated_elapsed();
736
737   unsigned long comm_size = MPI_COMM_WORLD->size();
738   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
739   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
740   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
741   std::vector<int> senddisps(comm_size, 0);
742   std::vector<int> recvdisps(comm_size, 0);
743
744   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
745                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
746                                       : MPI_DEFAULT_TYPE;
747   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
748                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
749                                      : MPI_DEFAULT_TYPE};
750
751   int send_buf_size=parse_double(action[2]);
752   int recv_buf_size=parse_double(action[3+comm_size]);
753   int my_proc_id = Actor::self()->getPid();
754   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
755   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
756
757   for (unsigned int i = 0; i < comm_size; i++) {
758     (*sendcounts)[i] = std::stoi(action[3 + i]);
759     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
760   }
761   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
762   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
763
764   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
765                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
766                                                        Datatype::encode(MPI_CURRENT_TYPE),
767                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
768
769   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
770                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
771
772   TRACE_smpi_comm_out(my_proc_id);
773   log_timed_action (action, clock);
774 }
775
776 }} // namespace simgrid::smpi
777
778 /** @brief Only initialize the replay, don't do it for real */
779 void smpi_replay_init(int* argc, char*** argv)
780 {
781   simgrid::smpi::Process::init(argc, argv);
782   smpi_process()->mark_as_initialized();
783   smpi_process()->set_replaying(true);
784
785   int my_proc_id = Actor::self()->getPid();
786   TRACE_smpi_init(my_proc_id);
787   TRACE_smpi_computing_init(my_proc_id);
788   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
789   TRACE_smpi_comm_out(my_proc_id);
790   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
791   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
792   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
793   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
794   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
795
796   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
797   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
798   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
799   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
800   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
801   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
802   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
803   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
804   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
805   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
806   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
807   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
808   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
809   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
810   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
812   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
814   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
816   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
817
818   //if we have a delayed start, sleep here.
819   if(*argc>2){
820     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822     smpi_execute_flops(value);
823   } else {
824     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
826     smpi_execute_flops(0.0);
827   }
828 }
829
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
832 {
833   simgrid::xbt::replay_runner(*argc, *argv);
834
835   /* and now, finalize everything */
836   /* One active process will stop. Decrease the counter*/
837   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838   if (not get_reqq_self()->empty()) {
839     unsigned int count_requests=get_reqq_self()->size();
840     MPI_Request requests[count_requests];
841     MPI_Status status[count_requests];
842     unsigned int i=0;
843
844     for (auto const& req : *get_reqq_self()) {
845       requests[i] = req;
846       i++;
847     }
848     simgrid::smpi::Request::waitall(count_requests, requests, status);
849   }
850   delete get_reqq_self();
851   active_processes--;
852
853   if(active_processes==0){
854     /* Last process alive speaking: end the simulated timer */
855     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856     smpi_free_replay_tmp_buffers();
857   }
858
859   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
860
861   smpi_process()->finalize();
862
863   TRACE_smpi_comm_out(Actor::self()->getPid());
864   TRACE_smpi_finalize(Actor::self()->getPid());
865 }
866
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
869 {
870   smpi_replay_init(argc, argv);
871   smpi_replay_main(argc, argv);
872 }