Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2022.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16
17 #include <cmath>
18 #include <limits>
19 #include <memory>
20 #include <numeric>
21 #include <tuple>
22 #include <unordered_map>
23 #include <vector>
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
26 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
27 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
28 // this could go into a header file.
29 namespace hash_tuple {
30 template <typename TT> class hash {
31 public:
32   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 };
34
35 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 {
37   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 }
39
40 // Recursive template code derived from Matthieu M.
41 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 public:
43   static void apply(size_t& seed, Tuple const& tuple)
44   {
45     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
46     hash_combine(seed, std::get<Index>(tuple));
47   }
48 };
49
50 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 public:
52   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 };
54
55 template <typename... TT> class hash<std::tuple<TT...>> {
56 public:
57   size_t operator()(std::tuple<TT...> const& tt) const
58   {
59     size_t seed = 0;
60     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
61     return seed;
62   }
63 };
64 }
65
66 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 {
68   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
69     std::string s = boost::algorithm::join(action, " ");
70     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
71   }
72 }
73
74 /* Helper functions */
75 static double parse_double(const std::string& string)
76 {
77   return xbt_str_parse_double(string.c_str(), "not a double");
78 }
79
80 template <typename T> static T parse_integer(const std::string& string)
81 {
82   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
83   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
84                  val <= static_cast<double>(std::numeric_limits<T>::max()),
85              "out of range: %g", val);
86   return static_cast<T>(val);
87 }
88
89 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 {
91   return i < action.size() ? std::stoi(action[i]) : 0;
92 }
93
94 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 {
96   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
97 }
98
99 namespace simgrid {
100 namespace smpi {
101
102 namespace replay {
103 MPI_Datatype MPI_DEFAULT_TYPE;
104
105 class RequestStorage {
106 private:
107   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
108   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
109
110   req_storage_t store;
111
112 public:
113   RequestStorage() = default;
114   size_t size() const { return store.size(); }
115
116   req_storage_t& get_store() { return store; }
117
118   void get_requests(std::vector<MPI_Request>& vec) const
119   {
120     for (auto const& pair : store) {
121       auto& req       = pair.second;
122       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
124         vec.push_back(pair.second);
125         pair.second->print_request("MM");
126       }
127     }
128   }
129
130     MPI_Request find(int src, int dst, int tag)
131     {
132       auto it = store.find(req_key_t(src, dst, tag));
133       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
134     }
135
136     void remove(const Request* req)
137     {
138       if (req == MPI_REQUEST_NULL) return;
139
140       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
141     }
142
143     void add(MPI_Request req)
144     {
145       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
146         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
147     }
148
149     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
150     void addNullRequest(int src, int dst, int tag)
151     {
152       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
153                     MPI_REQUEST_NULL});
154     }
155 };
156
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 {
159   CHECK_ACTION_PARAMS(action, 3, 0)
160   src = std::stoi(action[2]);
161   dst = std::stoi(action[3]);
162   tag = std::stoi(action[4]);
163 }
164
165 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 {
167   CHECK_ACTION_PARAMS(action, 3, 1)
168   partner = std::stoi(action[2]);
169   tag     = std::stoi(action[3]);
170   size      = parse_integer<size_t>(action[4]);
171   datatype1 = parse_datatype(action, 5);
172 }
173
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 0)
177   flops = parse_double(action[2]);
178 }
179
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 {
182   CHECK_ACTION_PARAMS(action, 1, 0)
183   time = parse_double(action[2]);
184 }
185
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 0)
189   filename = std::string(action[2]);
190   line = std::stoi(action[3]);
191 }
192
193 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 {
195   CHECK_ACTION_PARAMS(action, 1, 2)
196   size      = parse_integer<size_t>(action[2]);
197   root      = parse_root(action, 3);
198   datatype1 = parse_datatype(action, 4);
199 }
200
201 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 {
203   CHECK_ACTION_PARAMS(action, 2, 2)
204   comm_size = parse_integer<unsigned>(action[2]);
205   comp_size = parse_double(action[3]);
206   root      = parse_root(action, 4);
207   datatype1 = parse_datatype(action, 5);
208 }
209
210 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   comm_size = parse_integer<unsigned>(action[2]);
214   comp_size = parse_double(action[3]);
215   datatype1 = parse_datatype(action, 4);
216 }
217
218 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 {
220   CHECK_ACTION_PARAMS(action, 2, 1)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_integer<int>(action[2]);
223   recv_size = parse_integer<int>(action[3]);
224   datatype1 = parse_datatype(action, 4);
225   datatype2 = parse_datatype(action, 5);
226 }
227
228 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 {
230   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
231         0 gather 68 68 0 0 0
232       where:
233         1) 68 is the sendcounts
234         2) 68 is the recvcounts
235         3) 0 is the root node
236         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
237         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238   */
239   CHECK_ACTION_PARAMS(action, 2, 3)
240   comm_size = MPI_COMM_WORLD->size();
241   send_size = parse_integer<int>(action[2]);
242   recv_size = parse_integer<int>(action[3]);
243
244   if (name == "gather") {
245     root      = parse_root(action, 4);
246     datatype1 = parse_datatype(action, 5);
247     datatype2 = parse_datatype(action, 6);
248   } else {
249     root      = 0;
250     datatype1 = parse_datatype(action, 4);
251     datatype2 = parse_datatype(action, 5);
252   }
253 }
254
255 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 {
257   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
258        0 gather 68 68 10 10 10 0 0 0
259      where:
260        1) 68 is the sendcount
261        2) 68 10 10 10 is the recvcounts
262        3) 0 is the root node
263        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
264        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265   */
266   comm_size = MPI_COMM_WORLD->size();
267   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
268   send_size  = parse_integer<int>(action[2]);
269   disps      = std::vector<int>(comm_size, 0);
270   recvcounts = std::make_shared<std::vector<int>>(comm_size);
271
272   if (name == "gatherv") {
273     root      = parse_root(action, 3 + comm_size);
274     datatype1 = parse_datatype(action, 4 + comm_size);
275     datatype2 = parse_datatype(action, 5 + comm_size);
276   } else {
277     root                = 0;
278     unsigned disp_index = 0;
279     /* The 3 comes from "0 gather <sendcount>", which must always be present.
280      * The + comm_size is the recvcounts array, which must also be present
281      */
282     if (action.size() > 3 + comm_size + comm_size) {
283       // datatype + disp are specified
284       datatype1  = parse_datatype(action, 3 + comm_size);
285       datatype2  = parse_datatype(action, 4 + comm_size);
286       disp_index = 5 + comm_size;
287     } else if (action.size() > 3 + comm_size + 2) {
288       // disps specified; datatype is not specified; use the default one
289       datatype1  = MPI_DEFAULT_TYPE;
290       datatype2  = MPI_DEFAULT_TYPE;
291       disp_index = 3 + comm_size;
292     } else {
293       // no disp specified, maybe only datatype,
294       datatype1 = parse_datatype(action, 3 + comm_size);
295       datatype2 = parse_datatype(action, 4 + comm_size);
296     }
297
298     if (disp_index != 0) {
299       xbt_assert(disp_index + comm_size <= action.size());
300       for (unsigned i = 0; i < comm_size; i++)
301         disps[i]          = std::stoi(action[disp_index + i]);
302     }
303   }
304
305   for (unsigned int i = 0; i < comm_size; i++) {
306     (*recvcounts)[i] = std::stoi(action[i + 3]);
307   }
308   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
309 }
310
311 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 {
313   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
314         0 gather 68 68 0 0 0
315       where:
316         1) 68 is the sendcounts
317         2) 68 is the recvcounts
318         3) 0 is the root node
319         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
320         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321   */
322   comm_size = MPI_COMM_WORLD->size();
323   CHECK_ACTION_PARAMS(action, 2, 3)
324   comm_size = MPI_COMM_WORLD->size();
325   send_size = parse_integer<int>(action[2]);
326   recv_size = parse_integer<int>(action[3]);
327   root      = parse_root(action, 4);
328   datatype1 = parse_datatype(action, 5);
329   datatype2 = parse_datatype(action, 6);
330 }
331
332 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
333 {
334   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
335      0 gather 68 10 10 10 68 0 0 0
336       where:
337       1) 68 10 10 10 is the sendcounts
338       2) 68 is the recvcount
339       3) 0 is the root node
340       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
341       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
342   */
343   comm_size = MPI_COMM_WORLD->size();
344   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
345   recv_size  = parse_integer<int>(action[2 + comm_size]);
346   disps      = std::vector<int>(comm_size, 0);
347   sendcounts = std::make_shared<std::vector<int>>(comm_size);
348
349   root      = parse_root(action, 3 + comm_size);
350   datatype1 = parse_datatype(action, 4 + comm_size);
351   datatype2 = parse_datatype(action, 5 + comm_size);
352
353   for (unsigned int i = 0; i < comm_size; i++) {
354     (*sendcounts)[i] = std::stoi(action[i + 2]);
355   }
356   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
357 }
358
359 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
360 {
361   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
362        0 reducescatter 275427 275427 275427 204020 11346849 0
363      where:
364        1) The first four values after the name of the action declare the recvcounts array
365        2) The value 11346849 is the amount of instructions
366        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
367   */
368   comm_size = MPI_COMM_WORLD->size();
369   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
370   comp_size  = parse_double(action[2 + comm_size]);
371   recvcounts = std::make_shared<std::vector<int>>(comm_size);
372   datatype1  = parse_datatype(action, 3 + comm_size);
373
374   for (unsigned int i = 0; i < comm_size; i++) {
375     (*recvcounts)[i]= std::stoi(action[i + 2]);
376   }
377   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
378 }
379
380 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
381 {
382   CHECK_ACTION_PARAMS(action, 2, 1)
383   size      = parse_integer<size_t>(action[2]);
384   comp_size = parse_double(action[3]);
385   datatype1 = parse_datatype(action, 4);
386 }
387
388 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
389 {
390   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
391         0 alltoallv 100 1 7 10 12 100 1 70 10 5
392      where:
393       1) 100 is the size of the send buffer *sizeof(int),
394       2) 1 7 10 12 is the sendcounts array
395       3) 100*sizeof(int) is the size of the receiver buffer
396       4)  1 70 10 5 is the recvcounts array
397   */
398   comm_size = MPI_COMM_WORLD->size();
399   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
400   sendcounts = std::make_shared<std::vector<int>>(comm_size);
401   recvcounts = std::make_shared<std::vector<int>>(comm_size);
402   senddisps  = std::vector<int>(comm_size, 0);
403   recvdisps  = std::vector<int>(comm_size, 0);
404
405   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
406   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
407
408   send_buf_size = parse_integer<int>(action[2]);
409   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
410   for (unsigned int i = 0; i < comm_size; i++) {
411     (*sendcounts)[i] = std::stoi(action[3 + i]);
412     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
413   }
414   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
415   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
416 }
417
418 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
419 {
420   std::string s = boost::algorithm::join(action, " ");
421   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
422   const WaitTestParser& args = get_args();
423   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
424   req_storage.remove(request);
425
426   if (request == MPI_REQUEST_NULL) {
427     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
428      * return.*/
429     return;
430   }
431
432   // Must be taken before Request::wait() since the request may be set to
433   // MPI_REQUEST_NULL by Request::wait!
434   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
435
436   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
437
438   MPI_Status status;
439   Request::wait(&request, &status);
440
441   TRACE_smpi_comm_out(get_pid());
442   if (is_wait_for_receive)
443     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
444 }
445
446 void SendAction::kernel(simgrid::xbt::ReplayAction&)
447 {
448   const SendRecvParser& args = get_args();
449   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
450
451   TRACE_smpi_comm_in(
452       get_pid(), __func__,
453       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
454   if (not TRACE_smpi_view_internals())
455     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
456
457   if (get_name() == "send") {
458     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459   } else if (get_name() == "isend") {
460     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461     req_storage.add(request);
462   } else {
463     xbt_die("Don't know this action, %s", get_name().c_str());
464   }
465
466   TRACE_smpi_comm_out(get_pid());
467 }
468
469 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
470 {
471   const SendRecvParser& args = get_args();
472   TRACE_smpi_comm_in(
473       get_pid(), __func__,
474       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
475
476   MPI_Status status;
477   // unknown size from the receiver point of view
478   size_t arg_size = args.size;
479   if (arg_size == 0) {
480     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
481     arg_size = status.count;
482   }
483
484   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
485   if (get_name() == "recv") {
486     is_recv = true;
487     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
488   } else if (get_name() == "irecv") {
489     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
490     req_storage.add(request);
491   } else {
492     THROW_IMPOSSIBLE;
493   }
494
495   TRACE_smpi_comm_out(get_pid());
496   if (is_recv && not TRACE_smpi_view_internals()) {
497     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
498     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
499   }
500 }
501
502 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
503 {
504   const ComputeParser& args = get_args();
505   if (smpi_cfg_simulate_computation()) {
506     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
507   }
508 }
509
510 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
511 {
512   const SleepParser& args = get_args();
513   XBT_DEBUG("Sleep for: %lf secs", args.time);
514   aid_t pid = simgrid::s4u::this_actor::get_pid();
515   TRACE_smpi_sleeping_in(pid, args.time);
516   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
517   TRACE_smpi_sleeping_out(pid);
518 }
519
520 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
521 {
522   const LocationParser& args = get_args();
523   smpi_trace_set_call_location(args.filename.c_str(), args.line);
524 }
525
526 void TestAction::kernel(simgrid::xbt::ReplayAction&)
527 {
528   const WaitTestParser& args = get_args();
529   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
530   req_storage.remove(request);
531   // if request is null here, this may mean that a previous test has succeeded
532   // Different times in traced application and replayed version may lead to this
533   // In this case, ignore the extra calls.
534   if (request != MPI_REQUEST_NULL) {
535     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
536
537     MPI_Status status;
538     int flag = 0;
539     Request::test(&request, &status, &flag);
540
541     XBT_DEBUG("MPI_Test result: %d", flag);
542     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
543      * nullptr.*/
544     if (request == MPI_REQUEST_NULL)
545       req_storage.addNullRequest(args.src, args.dst, args.tag);
546     else
547       req_storage.add(request);
548
549     TRACE_smpi_comm_out(get_pid());
550   }
551 }
552
553 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
554 {
555   CHECK_ACTION_PARAMS(action, 0, 1)
556     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
557     : MPI_BYTE;  // default TAU datatype
558
559   /* start a simulated timer */
560   smpi_process()->simulated_start();
561 }
562
563 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
564 {
565   /* nothing to do */
566 }
567
568 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
569 {
570   const size_t count_requests = req_storage.size();
571
572   if (count_requests > 0) {
573     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
574     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
575     std::vector<MPI_Request> reqs;
576     req_storage.get_requests(reqs);
577     for (auto const& req : reqs) {
578       if (req && (req->flags() & MPI_REQ_RECV)) {
579         sender_receiver.emplace_back(req->src(), req->dst());
580       }
581     }
582     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
583     req_storage.get_store().clear();
584
585     for (auto const& pair : sender_receiver) {
586       TRACE_smpi_recv(pair.first, pair.second, 0);
587     }
588     TRACE_smpi_comm_out(get_pid());
589   }
590 }
591
592 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
593 {
594   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
595   colls::barrier(MPI_COMM_WORLD);
596   TRACE_smpi_comm_out(get_pid());
597 }
598
599 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
600 {
601   const BcastArgParser& args = get_args();
602   TRACE_smpi_comm_in(get_pid(), "action_bcast",
603                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
604                                                     0, Datatype::encode(args.datatype1), ""));
605
606   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
607
608   TRACE_smpi_comm_out(get_pid());
609 }
610
611 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
612 {
613   const ReduceArgParser& args = get_args();
614   TRACE_smpi_comm_in(get_pid(), "action_reduce",
615                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
616                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
617
618   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
619                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
620                 args.root, MPI_COMM_WORLD);
621   if (args.comp_size != 0.0)
622     simgrid::s4u::this_actor::exec_init(args.comp_size)
623       ->set_name("computation")
624       ->start()
625       ->wait();
626
627   TRACE_smpi_comm_out(get_pid());
628 }
629
630 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
631 {
632   const AllReduceArgParser& args = get_args();
633   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
634                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
635                                                     Datatype::encode(args.datatype1), ""));
636
637   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
638                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
639                    MPI_COMM_WORLD);
640   if (args.comp_size != 0.0)
641     simgrid::s4u::this_actor::exec_init(args.comp_size)
642       ->set_name("computation")
643       ->start()
644       ->wait();
645
646   TRACE_smpi_comm_out(get_pid());
647 }
648
649 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
650 {
651   const AllToAllArgParser& args = get_args();
652   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
653                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
654                                                     Datatype::encode(args.datatype1),
655                                                     Datatype::encode(args.datatype2)));
656
657   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
658                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
659                   MPI_COMM_WORLD);
660
661   TRACE_smpi_comm_out(get_pid());
662 }
663
664 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
665 {
666   const GatherArgParser& args = get_args();
667   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
669                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
670                                                     Datatype::encode(args.datatype2)));
671
672   if (get_name() == "gather") {
673     int rank = MPI_COMM_WORLD->rank();
674     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
675                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
676                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
677   } else
678     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
680                      MPI_COMM_WORLD);
681
682   TRACE_smpi_comm_out(get_pid());
683 }
684
685 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
686 {
687   int rank = MPI_COMM_WORLD->rank();
688   const GatherVArgParser& args = get_args();
689   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
690                      new simgrid::instr::VarCollTIData(
691                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
692                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
693
694   if (get_name() == "gatherv") {
695     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
696                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
697                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
698   } else {
699     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
701                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
702   }
703
704   TRACE_smpi_comm_out(get_pid());
705 }
706
707 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
708 {
709   int rank = MPI_COMM_WORLD->rank();
710   const ScatterArgParser& args = get_args();
711   TRACE_smpi_comm_in(get_pid(), "action_scatter",
712                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
713                                                     Datatype::encode(args.datatype1),
714                                                     Datatype::encode(args.datatype2)));
715
716   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
717                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
718                  args.datatype2, args.root, MPI_COMM_WORLD);
719
720   TRACE_smpi_comm_out(get_pid());
721 }
722
723 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
724 {
725   int rank = MPI_COMM_WORLD->rank();
726   const ScatterVArgParser& args = get_args();
727   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
728                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
729                                                        nullptr, Datatype::encode(args.datatype1),
730                                                        Datatype::encode(args.datatype2)));
731
732   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
733                   args.sendcounts->data(), args.disps.data(), args.datatype1,
734                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
735                   MPI_COMM_WORLD);
736
737   TRACE_smpi_comm_out(get_pid());
738 }
739
740 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
741 {
742   const ReduceScatterArgParser& args = get_args();
743   TRACE_smpi_comm_in(
744       get_pid(), "action_reducescatter",
745       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
746                                         /* ugly as we use datatype field to pass computation as string */
747                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
748                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
749                                         Datatype::encode(args.datatype1)));
750
751   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
752                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
753                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
754   if (args.comp_size != 0.0)
755     simgrid::s4u::this_actor::exec_init(args.comp_size)
756       ->set_name("computation")
757       ->start()
758       ->wait();
759   TRACE_smpi_comm_out(get_pid());
760 }
761
762 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
763 {
764   const ScanArgParser& args = get_args();
765   TRACE_smpi_comm_in(get_pid(), "action_scan",
766                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
767                      args.size, 0, Datatype::encode(args.datatype1), ""));
768   if (get_name() == "scan")
769     colls::scan(send_buffer(args.size * args.datatype1->size()),
770               recv_buffer(args.size * args.datatype1->size()), args.size,
771               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
772   else
773     colls::exscan(send_buffer(args.size * args.datatype1->size()),
774               recv_buffer(args.size * args.datatype1->size()), args.size,
775               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
776
777   if (args.comp_size != 0.0)
778     simgrid::s4u::this_actor::exec_init(args.comp_size)
779       ->set_name("computation")
780       ->start()
781       ->wait();
782   TRACE_smpi_comm_out(get_pid());
783 }
784
785 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
786 {
787   const AllToAllVArgParser& args = get_args();
788   TRACE_smpi_comm_in(get_pid(), __func__,
789                      new simgrid::instr::VarCollTIData(
790                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
791                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
792
793   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
794                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
795                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
796
797   TRACE_smpi_comm_out(get_pid());
798 }
799 } // Replay Namespace
800 }} // namespace simgrid::smpi
801
802 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
805 {
806   xbt_assert(not smpi_process()->initializing());
807
808   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
809   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
810   simgrid::smpi::ActorExt::init();
811
812   smpi_process()->mark_as_initialized();
813   smpi_process()->set_replaying(true);
814
815   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
816   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
817   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
818   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
822   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
829   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
830   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
831   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
832   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
833   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
834   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
835   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
836   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
837   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
838   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
839   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
840   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
841   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
842   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
843   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
844   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
845   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
846
847   //if we have a delayed start, sleep here.
848   if (start_delay_flops > 0) {
849     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
850     private_execute_flops(start_delay_flops);
851   } else {
852     // Wait for the other actors to initialize also
853     simgrid::s4u::this_actor::yield();
854   }
855   if(_smpi_init_sleep > 0)
856     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
857 }
858
859 /** @brief actually run the replay after initialization */
860 void smpi_replay_main(int rank, const char* private_trace_filename)
861 {
862   static int active_processes = 0;
863   active_processes++;
864   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
865   std::string rank_string                      = std::to_string(rank);
866   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
867
868   /* and now, finalize everything */
869   /* One active process will stop. Decrease the counter*/
870   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
871   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
872   if (count_requests > 0) {
873     std::vector<MPI_Request> requests(count_requests);
874     unsigned int i=0;
875
876     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
877       requests[i] = pair.second;
878       i++;
879     }
880     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
881   }
882
883   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
884     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
885
886   active_processes--;
887
888   if(active_processes==0){
889     /* Last process alive speaking: end the simulated timer */
890     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
891     smpi_free_replay_tmp_buffers();
892   }
893
894   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
895                      new simgrid::instr::NoOpTIData("finalize"));
896
897   smpi_process()->finalize();
898
899   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
900 }
901
902 /** @brief chain a replay initialization and a replay start */
903 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
904 {
905   smpi_replay_init(instance_id, rank, start_delay_flops);
906   smpi_replay_main(rank, private_trace_filename);
907 }