Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f5cd6897a982c4295f120bd381fb8d04268caa44
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     if (name != "Init")
120       log_timed_action(action, start_time);
121   }
122
123   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
124 };
125
126 class WaitAction : public ReplayAction<ActionArgParser> {
127 public:
128   WaitAction() : ReplayAction("Wait") {}
129   void kernel(simgrid::xbt::ReplayAction& action) override
130   {
131     CHECK_ACTION_PARAMS(action, 0, 0)
132     MPI_Status status;
133
134     std::string s = boost::algorithm::join(action, " ");
135     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
136     MPI_Request request = get_reqq_self()->back();
137     get_reqq_self()->pop_back();
138
139     if (request == nullptr) {
140       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
141        * return.*/
142       return;
143     }
144
145     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146
147     // Must be taken before Request::wait() since the request may be set to
148     // MPI_REQUEST_NULL by Request::wait!
149     int src                  = request->comm()->group()->rank(request->src());
150     int dst                  = request->comm()->group()->rank(request->dst());
151     bool is_wait_for_receive = (request->flags() & RECV);
152     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
153     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
154
155     Request::wait(&request, &status);
156
157     TRACE_smpi_comm_out(rank);
158     if (is_wait_for_receive)
159       TRACE_smpi_recv(src, dst, 0);
160   }
161 };
162
163 class SendAction : public ReplayAction<SendRecvParser> {
164 public:
165   SendAction() = delete;
166   SendAction(std::string name) : ReplayAction(name) {}
167   void kernel(simgrid::xbt::ReplayAction& action) override
168   {
169     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
170
171     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
172                                                                                  Datatype::encode(args.datatype1)));
173     if (not TRACE_smpi_view_internals())
174       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
175
176     if (name == "send") {
177       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178     } else if (name == "Isend") {
179       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
180       get_reqq_self()->push_back(request);
181     } else {
182       xbt_die("Don't know this action, %s", name.c_str());
183     }
184
185     TRACE_smpi_comm_out(my_proc_id);
186   }
187 };
188
189 class RecvAction : public ReplayAction<SendRecvParser> {
190 public:
191   RecvAction() = delete;
192   explicit RecvAction(std::string name) : ReplayAction(name) {}
193   void kernel(simgrid::xbt::ReplayAction& action) override
194   {
195     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
196
197     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
198                                                                                  Datatype::encode(args.datatype1)));
199
200     MPI_Status status;
201     // unknown size from the receiver point of view
202     if (args.size <= 0.0) {
203       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
204       args.size = status.count;
205     }
206
207     if (name == "recv") {
208       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
209     } else if (name == "Irecv") {
210       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
211       get_reqq_self()->push_back(request);
212     }
213
214     TRACE_smpi_comm_out(my_proc_id);
215     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
216     if (name == "recv" && not TRACE_smpi_view_internals()) {
217       TRACE_smpi_recv(src_traced, my_proc_id, 0);
218     }
219   }
220 };
221
222 class ComputeAction : public ReplayAction<ComputeParser> {
223 public:
224   ComputeAction() : ReplayAction("compute") {}
225   void kernel(simgrid::xbt::ReplayAction& action) override
226   {
227     TRACE_smpi_computing_in(my_proc_id, args.flops);
228     smpi_execute_flops(args.flops);
229     TRACE_smpi_computing_out(my_proc_id);
230   }
231 };
232
233 class TestAction : public ReplayAction<ActionArgParser> {
234 public:
235   TestAction() : ReplayAction("Test") {}
236   void kernel(simgrid::xbt::ReplayAction& action) override
237   {
238     MPI_Request request = get_reqq_self()->back();
239     get_reqq_self()->pop_back();
240     // if request is null here, this may mean that a previous test has succeeded
241     // Different times in traced application and replayed version may lead to this
242     // In this case, ignore the extra calls.
243     if (request != nullptr) {
244       TRACE_smpi_testing_in(my_proc_id);
245
246       MPI_Status status;
247       int flag = Request::test(&request, &status);
248
249       XBT_DEBUG("MPI_Test result: %d", flag);
250       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
251        * nullptr.*/
252       get_reqq_self()->push_back(request);
253
254       TRACE_smpi_testing_out(my_proc_id);
255     }
256   }
257 };
258
259 class InitAction : public ReplayAction<ActionArgParser> {
260 public:
261   InitAction() : ReplayAction("Init") {}
262   void kernel(simgrid::xbt::ReplayAction& action) override
263   {
264     CHECK_ACTION_PARAMS(action, 0, 1)
265     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
266                                            : MPI_BYTE;  // default TAU datatype
267
268     /* start a simulated timer */
269     smpi_process()->simulated_start();
270     /*initialize the number of active processes */
271     active_processes = smpi_process_count();
272
273     set_reqq_self(new std::vector<MPI_Request>);
274   }
275 };
276
277 class CommunicatorAction : public ReplayAction<ActionArgParser> {
278 public:
279   CommunicatorAction() : ReplayAction("Comm") {}
280   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
281 };
282
283 class WaitAllAction : public ReplayAction<ActionArgParser> {
284 public:
285   WaitAllAction() : ReplayAction("waitAll") {}
286   void kernel(simgrid::xbt::ReplayAction& action) override
287   {
288     const unsigned int count_requests = get_reqq_self()->size();
289
290     if (count_requests > 0) {
291       TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
292                          new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
293       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
294       for (const auto& req : (*get_reqq_self())) {
295         if (req && (req->flags() & RECV)) {
296           sender_receiver.push_back({req->src(), req->dst()});
297         }
298       }
299       MPI_Status status[count_requests];
300       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
301
302       for (auto& pair : sender_receiver) {
303         TRACE_smpi_recv(pair.first, pair.second, 0);
304       }
305       TRACE_smpi_comm_out(my_proc_id);
306     }
307   }
308 };
309
310 } // Replay Namespace
311
312 static void action_barrier(simgrid::xbt::ReplayAction& action)
313 {
314   double clock = smpi_process()->simulated_elapsed();
315   int my_proc_id = Actor::self()->getPid();
316   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
317
318   Colls::barrier(MPI_COMM_WORLD);
319
320   TRACE_smpi_comm_out(my_proc_id);
321   log_timed_action (action, clock);
322 }
323
324 static void action_bcast(simgrid::xbt::ReplayAction& action)
325 {
326   CHECK_ACTION_PARAMS(action, 1, 2)
327   double size = parse_double(action[2]);
328   double clock = smpi_process()->simulated_elapsed();
329   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
330   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
331   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
332
333   int my_proc_id = Actor::self()->getPid();
334   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
335                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
336                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
337
338   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
339
340   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
341
342   TRACE_smpi_comm_out(my_proc_id);
343   log_timed_action (action, clock);
344 }
345
346 static void action_reduce(simgrid::xbt::ReplayAction& action)
347 {
348   CHECK_ACTION_PARAMS(action, 2, 2)
349   double comm_size = parse_double(action[2]);
350   double comp_size = parse_double(action[3]);
351   double clock = smpi_process()->simulated_elapsed();
352   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
353
354   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
355
356   int my_proc_id = Actor::self()->getPid();
357   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
358                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
359                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
360
361   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
362   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
363   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
364   smpi_execute_flops(comp_size);
365
366   TRACE_smpi_comm_out(my_proc_id);
367   log_timed_action (action, clock);
368 }
369
370 static void action_allReduce(simgrid::xbt::ReplayAction& action)
371 {
372   CHECK_ACTION_PARAMS(action, 2, 1)
373   double comm_size = parse_double(action[2]);
374   double comp_size = parse_double(action[3]);
375
376   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
377
378   double clock = smpi_process()->simulated_elapsed();
379   int my_proc_id = Actor::self()->getPid();
380   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
381                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
382
383   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
384   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
385   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
386   smpi_execute_flops(comp_size);
387
388   TRACE_smpi_comm_out(my_proc_id);
389   log_timed_action (action, clock);
390 }
391
392 static void action_allToAll(simgrid::xbt::ReplayAction& action)
393 {
394   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
395   double clock = smpi_process()->simulated_elapsed();
396   unsigned long comm_size = MPI_COMM_WORLD->size();
397   int send_size = parse_double(action[2]);
398   int recv_size = parse_double(action[3]);
399   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
400   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
401
402   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
403   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
404
405   int my_proc_id = Actor::self()->getPid();
406   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
407                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
408                                                     Datatype::encode(MPI_CURRENT_TYPE),
409                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
410
411   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_gather(simgrid::xbt::ReplayAction& action)
418 {
419   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
420         0 gather 68 68 0 0 0
421       where:
422         1) 68 is the sendcounts
423         2) 68 is the recvcounts
424         3) 0 is the root node
425         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
426         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
427   */
428   CHECK_ACTION_PARAMS(action, 2, 3)
429   double clock = smpi_process()->simulated_elapsed();
430   unsigned long comm_size = MPI_COMM_WORLD->size();
431   int send_size = parse_double(action[2]);
432   int recv_size = parse_double(action[3]);
433   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
434   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
435
436   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
437   void *recv = nullptr;
438   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
439   int rank = MPI_COMM_WORLD->rank();
440
441   if(rank==root)
442     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
443
444   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
445                                                                         Datatype::encode(MPI_CURRENT_TYPE),
446                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
447
448   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
449
450   TRACE_smpi_comm_out(Actor::self()->getPid());
451   log_timed_action (action, clock);
452 }
453
454 static void action_scatter(simgrid::xbt::ReplayAction& action)
455 {
456   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
457         0 gather 68 68 0 0 0
458       where:
459         1) 68 is the sendcounts
460         2) 68 is the recvcounts
461         3) 0 is the root node
462         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
463         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
464   */
465   CHECK_ACTION_PARAMS(action, 2, 3)
466   double clock                   = smpi_process()->simulated_elapsed();
467   unsigned long comm_size        = MPI_COMM_WORLD->size();
468   int send_size                  = parse_double(action[2]);
469   int recv_size                  = parse_double(action[3]);
470   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
471   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
472
473   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
474   void* recv = nullptr;
475   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
476   int rank = MPI_COMM_WORLD->rank();
477
478   if (rank == root)
479     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
480
481   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
482                                                                         Datatype::encode(MPI_CURRENT_TYPE),
483                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
484
485   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
486
487   TRACE_smpi_comm_out(Actor::self()->getPid());
488   log_timed_action(action, clock);
489 }
490
491 static void action_gatherv(simgrid::xbt::ReplayAction& action)
492 {
493   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
494        0 gather 68 68 10 10 10 0 0 0
495      where:
496        1) 68 is the sendcount
497        2) 68 10 10 10 is the recvcounts
498        3) 0 is the root node
499        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
500        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
501   */
502   double clock = smpi_process()->simulated_elapsed();
503   unsigned long comm_size = MPI_COMM_WORLD->size();
504   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
505   int send_size = parse_double(action[2]);
506   std::vector<int> disps(comm_size, 0);
507   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
508
509   MPI_Datatype MPI_CURRENT_TYPE =
510       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
511   MPI_Datatype MPI_CURRENT_TYPE2{
512       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
513
514   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
515   void *recv = nullptr;
516   for (unsigned int i = 0; i < comm_size; i++) {
517     (*recvcounts)[i] = std::stoi(action[i + 3]);
518   }
519   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
520
521   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
522   int rank = MPI_COMM_WORLD->rank();
523
524   if(rank==root)
525     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
526
527   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
528                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
529                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
530
531   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
532                  MPI_COMM_WORLD);
533
534   TRACE_smpi_comm_out(Actor::self()->getPid());
535   log_timed_action (action, clock);
536 }
537
538 static void action_scatterv(simgrid::xbt::ReplayAction& action)
539 {
540   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
541        0 gather 68 10 10 10 68 0 0 0
542      where:
543        1) 68 10 10 10 is the sendcounts
544        2) 68 is the recvcount
545        3) 0 is the root node
546        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
547        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
548   */
549   double clock  = smpi_process()->simulated_elapsed();
550   unsigned long comm_size = MPI_COMM_WORLD->size();
551   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
552   int recv_size = parse_double(action[2 + comm_size]);
553   std::vector<int> disps(comm_size, 0);
554   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
555
556   MPI_Datatype MPI_CURRENT_TYPE =
557       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
558   MPI_Datatype MPI_CURRENT_TYPE2{
559       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
560
561   void* send = nullptr;
562   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
563   for (unsigned int i = 0; i < comm_size; i++) {
564     (*sendcounts)[i] = std::stoi(action[i + 2]);
565   }
566   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
567
568   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
569   int rank = MPI_COMM_WORLD->rank();
570
571   if (rank == root)
572     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
573
574   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
575                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
576                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
577
578   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
579                   MPI_COMM_WORLD);
580
581   TRACE_smpi_comm_out(Actor::self()->getPid());
582   log_timed_action(action, clock);
583 }
584
585 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
586 {
587   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
588        0 reduceScatter 275427 275427 275427 204020 11346849 0
589      where:
590        1) The first four values after the name of the action declare the recvcounts array
591        2) The value 11346849 is the amount of instructions
592        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
593  */
594   double clock = smpi_process()->simulated_elapsed();
595   unsigned long comm_size = MPI_COMM_WORLD->size();
596   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
597   int comp_size = parse_double(action[2+comm_size]);
598   int my_proc_id                     = Actor::self()->getPid();
599   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
600   MPI_Datatype MPI_CURRENT_TYPE =
601       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
602
603   for (unsigned int i = 0; i < comm_size; i++) {
604     recvcounts->push_back(std::stoi(action[i + 2]));
605   }
606   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
607
608   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
609                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
610                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
611                                                        Datatype::encode(MPI_CURRENT_TYPE)));
612
613   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
614   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
615
616   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
617   smpi_execute_flops(comp_size);
618
619   TRACE_smpi_comm_out(my_proc_id);
620   log_timed_action (action, clock);
621 }
622
623 static void action_allgather(simgrid::xbt::ReplayAction& action)
624 {
625   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
626         0 allGather 275427 275427
627     where:
628         1) 275427 is the sendcount
629         2) 275427 is the recvcount
630         3) No more values mean that the datatype for sent and receive buffer is the default one, see
631     simgrid::smpi::Datatype::decode().
632   */
633   double clock = smpi_process()->simulated_elapsed();
634
635   CHECK_ACTION_PARAMS(action, 2, 2)
636   int sendcount = std::stoi(action[2]);
637   int recvcount = std::stoi(action[3]);
638
639   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
640   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
641
642   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
643   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
644
645   int my_proc_id = Actor::self()->getPid();
646
647   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
648                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
649                                                     Datatype::encode(MPI_CURRENT_TYPE),
650                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
651
652   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
653
654   TRACE_smpi_comm_out(my_proc_id);
655   log_timed_action (action, clock);
656 }
657
658 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
659 {
660   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
661         0 allGatherV 275427 275427 275427 275427 204020
662      where:
663         1) 275427 is the sendcount
664         2) The next four elements declare the recvcounts array
665         3) No more values mean that the datatype for sent and receive buffer is the default one, see
666      simgrid::smpi::Datatype::decode().
667   */
668   double clock = smpi_process()->simulated_elapsed();
669
670   unsigned long comm_size = MPI_COMM_WORLD->size();
671   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
672   int sendcount = std::stoi(action[2]);
673   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
674   std::vector<int> disps(comm_size, 0);
675
676   int datatype_index = 0, disp_index = 0;
677   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
678     datatype_index = 3 + comm_size;
679     disp_index     = datatype_index + 1;
680   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
681     datatype_index = -1;
682     disp_index     = 3 + comm_size;
683   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
684     datatype_index = 3 + comm_size;
685   }
686
687   if (disp_index != 0) {
688     for (unsigned int i = 0; i < comm_size; i++)
689       disps[i]          = std::stoi(action[disp_index + i]);
690   }
691
692   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
693                                                      : MPI_DEFAULT_TYPE};
694   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
695                                                       : MPI_DEFAULT_TYPE};
696
697   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
698
699   for (unsigned int i = 0; i < comm_size; i++) {
700     (*recvcounts)[i] = std::stoi(action[i + 3]);
701   }
702   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
703   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
704
705   int my_proc_id = Actor::self()->getPid();
706
707   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
708                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
709                                                        Datatype::encode(MPI_CURRENT_TYPE),
710                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
711
712   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
713                     MPI_COMM_WORLD);
714
715   TRACE_smpi_comm_out(my_proc_id);
716   log_timed_action (action, clock);
717 }
718
719 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
720 {
721   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
722         0 allToAllV 100 1 7 10 12 100 1 70 10 5
723      where:
724         1) 100 is the size of the send buffer *sizeof(int),
725         2) 1 7 10 12 is the sendcounts array
726         3) 100*sizeof(int) is the size of the receiver buffer
727         4)  1 70 10 5 is the recvcounts array
728   */
729   double clock = smpi_process()->simulated_elapsed();
730
731   unsigned long comm_size = MPI_COMM_WORLD->size();
732   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
733   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
734   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
735   std::vector<int> senddisps(comm_size, 0);
736   std::vector<int> recvdisps(comm_size, 0);
737
738   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
739                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
740                                       : MPI_DEFAULT_TYPE;
741   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
742                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
743                                      : MPI_DEFAULT_TYPE};
744
745   int send_buf_size=parse_double(action[2]);
746   int recv_buf_size=parse_double(action[3+comm_size]);
747   int my_proc_id = Actor::self()->getPid();
748   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
749   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
750
751   for (unsigned int i = 0; i < comm_size; i++) {
752     (*sendcounts)[i] = std::stoi(action[3 + i]);
753     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
754   }
755   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
756   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
757
758   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
759                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
760                                                        Datatype::encode(MPI_CURRENT_TYPE),
761                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
762
763   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
764                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
765
766   TRACE_smpi_comm_out(my_proc_id);
767   log_timed_action (action, clock);
768 }
769
770 }} // namespace simgrid::smpi
771
772 /** @brief Only initialize the replay, don't do it for real */
773 void smpi_replay_init(int* argc, char*** argv)
774 {
775   simgrid::smpi::Process::init(argc, argv);
776   smpi_process()->mark_as_initialized();
777   smpi_process()->set_replaying(true);
778
779   int my_proc_id = Actor::self()->getPid();
780   TRACE_smpi_init(my_proc_id);
781   TRACE_smpi_computing_init(my_proc_id);
782   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
783   TRACE_smpi_comm_out(my_proc_id);
784   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
785   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
786   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
787   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
788   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
789
790   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
791   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
792   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
793   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
794   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
795   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
796   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
797   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
798   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
799   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
800   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
801   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
802   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
803   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
804   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
805   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
806   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
807   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
808   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
809   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
810   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
811
812   //if we have a delayed start, sleep here.
813   if(*argc>2){
814     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
815     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
816     smpi_execute_flops(value);
817   } else {
818     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
819     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
820     smpi_execute_flops(0.0);
821   }
822 }
823
824 /** @brief actually run the replay after initialization */
825 void smpi_replay_main(int* argc, char*** argv)
826 {
827   simgrid::xbt::replay_runner(*argc, *argv);
828
829   /* and now, finalize everything */
830   /* One active process will stop. Decrease the counter*/
831   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
832   if (not get_reqq_self()->empty()) {
833     unsigned int count_requests=get_reqq_self()->size();
834     MPI_Request requests[count_requests];
835     MPI_Status status[count_requests];
836     unsigned int i=0;
837
838     for (auto const& req : *get_reqq_self()) {
839       requests[i] = req;
840       i++;
841     }
842     simgrid::smpi::Request::waitall(count_requests, requests, status);
843   }
844   delete get_reqq_self();
845   active_processes--;
846
847   if(active_processes==0){
848     /* Last process alive speaking: end the simulated timer */
849     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
850     smpi_free_replay_tmp_buffers();
851   }
852
853   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
854
855   smpi_process()->finalize();
856
857   TRACE_smpi_comm_out(Actor::self()->getPid());
858   TRACE_smpi_finalize(Actor::self()->getPid());
859 }
860
861 /** @brief chain a replay initialization and a replay start */
862 void smpi_replay_run(int* argc, char*** argv)
863 {
864   smpi_replay_init(argc, argv);
865   smpi_replay_main(argc, argv);
866 }