Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill redundant code.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid {
101 namespace smpi {
102
103 namespace replay {
104 MPI_Datatype MPI_DEFAULT_TYPE;
105
106 class RequestStorage {
107 private:
108   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
110
111   req_storage_t store;
112
113 public:
114   RequestStorage() = default;
115   size_t size() const { return store.size(); }
116
117   req_storage_t& get_store() { return store; }
118
119   void get_requests(std::vector<MPI_Request>& vec) const
120   {
121     for (auto const& pair : store) {
122       auto& reqs       = pair.second;
123       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124       for (auto& req: reqs){
125         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
126           vec.push_back(req);
127           req->print_request("MM");
128         }
129       }
130     }
131   }
132
133   MPI_Request pop(int src, int dst, int tag)
134   {
135     auto it = store.find(req_key_t(src, dst, tag));
136     if (it == store.end())
137      return MPI_REQUEST_NULL;
138     MPI_Request req = it->second.front();
139     it->second.pop_front();
140     if(it->second.empty())
141       store.erase(req_key_t(src, dst, tag));
142     return req;
143   }
144
145   void add(MPI_Request req)
146   {
147     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
148       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
149     }
150   }
151
152   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
153   void addNullRequest(int src, int dst, int tag)
154   {
155     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
156     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
157     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
158   }
159 };
160
161 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
162 {
163   CHECK_ACTION_PARAMS(action, 3, 0)
164   src = std::stoi(action[2]);
165   dst = std::stoi(action[3]);
166   tag = std::stoi(action[4]);
167 }
168
169 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 {
171   CHECK_ACTION_PARAMS(action, 3, 1)
172   partner = std::stoi(action[2]);
173   tag     = std::stoi(action[3]);
174   size      = parse_integer<size_t>(action[4]);
175   datatype1 = parse_datatype(action, 5);
176 }
177
178 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 {
180   CHECK_ACTION_PARAMS(action, 1, 0)
181   flops = parse_double(action[2]);
182 }
183
184 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 {
186   CHECK_ACTION_PARAMS(action, 1, 0)
187   time = parse_double(action[2]);
188 }
189
190 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 0)
193   filename = std::string(action[2]);
194   line = std::stoi(action[3]);
195 }
196
197 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 {
199   CHECK_ACTION_PARAMS(action, 1, 2)
200   size      = parse_integer<size_t>(action[2]);
201   root      = parse_root(action, 3);
202   datatype1 = parse_datatype(action, 4);
203 }
204
205 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
206 {
207   CHECK_ACTION_PARAMS(action, 2, 2)
208   comm_size = parse_integer<unsigned>(action[2]);
209   comp_size = parse_double(action[3]);
210   root      = parse_root(action, 4);
211   datatype1 = parse_datatype(action, 5);
212 }
213
214 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
215 {
216   CHECK_ACTION_PARAMS(action, 2, 1)
217   comm_size = parse_integer<unsigned>(action[2]);
218   comp_size = parse_double(action[3]);
219   datatype1 = parse_datatype(action, 4);
220 }
221
222 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
223 {
224   CHECK_ACTION_PARAMS(action, 2, 1)
225   comm_size = MPI_COMM_WORLD->size();
226   send_size = parse_integer<int>(action[2]);
227   recv_size = parse_integer<int>(action[3]);
228   datatype1 = parse_datatype(action, 4);
229   datatype2 = parse_datatype(action, 5);
230 }
231
232 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
233 {
234   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
235         0 gather 68 68 0 0 0
236       where:
237         1) 68 is the sendcounts
238         2) 68 is the recvcounts
239         3) 0 is the root node
240         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
241         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
242   */
243   CHECK_ACTION_PARAMS(action, 2, 3)
244   comm_size = MPI_COMM_WORLD->size();
245   send_size = parse_integer<int>(action[2]);
246   recv_size = parse_integer<int>(action[3]);
247
248   if (name == "gather") {
249     root      = parse_root(action, 4);
250     datatype1 = parse_datatype(action, 5);
251     datatype2 = parse_datatype(action, 6);
252   } else {
253     root      = 0;
254     datatype1 = parse_datatype(action, 4);
255     datatype2 = parse_datatype(action, 5);
256   }
257 }
258
259 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
260 {
261   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
262        0 gather 68 68 10 10 10 0 0 0
263      where:
264        1) 68 is the sendcount
265        2) 68 10 10 10 is the recvcounts
266        3) 0 is the root node
267        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
268        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
269   */
270   comm_size = MPI_COMM_WORLD->size();
271   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
272   send_size  = parse_integer<int>(action[2]);
273   disps      = std::vector<int>(comm_size, 0);
274   recvcounts = std::make_shared<std::vector<int>>(comm_size);
275
276   if (name == "gatherv") {
277     root      = parse_root(action, 3 + comm_size);
278     datatype1 = parse_datatype(action, 4 + comm_size);
279     datatype2 = parse_datatype(action, 5 + comm_size);
280   } else {
281     root                = 0;
282     unsigned disp_index = 0;
283     /* The 3 comes from "0 gather <sendcount>", which must always be present.
284      * The + comm_size is the recvcounts array, which must also be present
285      */
286     if (action.size() > 3 + comm_size + comm_size) {
287       // datatype + disp are specified
288       datatype1  = parse_datatype(action, 3 + comm_size);
289       datatype2  = parse_datatype(action, 4 + comm_size);
290       disp_index = 5 + comm_size;
291     } else if (action.size() > 3 + comm_size + 2) {
292       // disps specified; datatype is not specified; use the default one
293       datatype1  = MPI_DEFAULT_TYPE;
294       datatype2  = MPI_DEFAULT_TYPE;
295       disp_index = 3 + comm_size;
296     } else {
297       // no disp specified, maybe only datatype,
298       datatype1 = parse_datatype(action, 3 + comm_size);
299       datatype2 = parse_datatype(action, 4 + comm_size);
300     }
301
302     if (disp_index != 0) {
303       xbt_assert(disp_index + comm_size <= action.size());
304       for (unsigned i = 0; i < comm_size; i++)
305         disps[i]          = std::stoi(action[disp_index + i]);
306     }
307   }
308
309   for (unsigned int i = 0; i < comm_size; i++) {
310     (*recvcounts)[i] = std::stoi(action[i + 3]);
311   }
312   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
313 }
314
315 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
316 {
317   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
318         0 gather 68 68 0 0 0
319       where:
320         1) 68 is the sendcounts
321         2) 68 is the recvcounts
322         3) 0 is the root node
323         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
325   */
326   comm_size = MPI_COMM_WORLD->size();
327   CHECK_ACTION_PARAMS(action, 2, 3)
328   comm_size = MPI_COMM_WORLD->size();
329   send_size = parse_integer<int>(action[2]);
330   recv_size = parse_integer<int>(action[3]);
331   root      = parse_root(action, 4);
332   datatype1 = parse_datatype(action, 5);
333   datatype2 = parse_datatype(action, 6);
334 }
335
336 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
337 {
338   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
339      0 gather 68 10 10 10 68 0 0 0
340       where:
341       1) 68 10 10 10 is the sendcounts
342       2) 68 is the recvcount
343       3) 0 is the root node
344       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
345       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
346   */
347   comm_size = MPI_COMM_WORLD->size();
348   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
349   recv_size  = parse_integer<int>(action[2 + comm_size]);
350   disps      = std::vector<int>(comm_size, 0);
351   sendcounts = std::make_shared<std::vector<int>>(comm_size);
352
353   root      = parse_root(action, 3 + comm_size);
354   datatype1 = parse_datatype(action, 4 + comm_size);
355   datatype2 = parse_datatype(action, 5 + comm_size);
356
357   for (unsigned int i = 0; i < comm_size; i++) {
358     (*sendcounts)[i] = std::stoi(action[i + 2]);
359   }
360   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
361 }
362
363 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
364 {
365   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
366        0 reducescatter 275427 275427 275427 204020 11346849 0
367      where:
368        1) The first four values after the name of the action declare the recvcounts array
369        2) The value 11346849 is the amount of instructions
370        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
371   */
372   comm_size = MPI_COMM_WORLD->size();
373   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
374   comp_size  = parse_double(action[2 + comm_size]);
375   recvcounts = std::make_shared<std::vector<int>>(comm_size);
376   datatype1  = parse_datatype(action, 3 + comm_size);
377
378   for (unsigned int i = 0; i < comm_size; i++) {
379     (*recvcounts)[i]= std::stoi(action[i + 2]);
380   }
381   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
382 }
383
384 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
385 {
386   CHECK_ACTION_PARAMS(action, 2, 1)
387   size      = parse_integer<size_t>(action[2]);
388   comp_size = parse_double(action[3]);
389   datatype1 = parse_datatype(action, 4);
390 }
391
392 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
393 {
394   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
395         0 alltoallv 100 1 7 10 12 100 1 70 10 5
396      where:
397       1) 100 is the size of the send buffer *sizeof(int),
398       2) 1 7 10 12 is the sendcounts array
399       3) 100*sizeof(int) is the size of the receiver buffer
400       4)  1 70 10 5 is the recvcounts array
401   */
402   comm_size = MPI_COMM_WORLD->size();
403   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
404   sendcounts = std::make_shared<std::vector<int>>(comm_size);
405   recvcounts = std::make_shared<std::vector<int>>(comm_size);
406   senddisps  = std::vector<int>(comm_size, 0);
407   recvdisps  = std::vector<int>(comm_size, 0);
408
409   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
410   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
411
412   send_buf_size = parse_integer<int>(action[2]);
413   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
414   for (unsigned int i = 0; i < comm_size; i++) {
415     (*sendcounts)[i] = std::stoi(action[3 + i]);
416     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
417   }
418   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
419   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
420 }
421
422 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
423 {
424   std::string s = boost::algorithm::join(action, " ");
425   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
426   const WaitTestParser& args = get_args();
427   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
428
429   if (request == MPI_REQUEST_NULL) {
430     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
431      * return.*/
432     return;
433   }
434
435   // Must be taken before Request::wait() since the request may be set to
436   // MPI_REQUEST_NULL by Request::wait!
437   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
438
439   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
440
441   MPI_Status status;
442   Request::wait(&request, &status);
443   if(request!=MPI_REQUEST_NULL)
444     Request::unref(&request);
445   TRACE_smpi_comm_out(get_pid());
446   if (is_wait_for_receive)
447     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
448 }
449
450 void SendAction::kernel(simgrid::xbt::ReplayAction&)
451 {
452   const SendRecvParser& args = get_args();
453   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
454
455   TRACE_smpi_comm_in(
456       get_pid(), __func__,
457       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
458   if (not TRACE_smpi_view_internals())
459     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
460
461   if (get_name() == "send") {
462     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463   } else if (get_name() == "isend") {
464     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
465     req_storage.add(request);
466   } else {
467     xbt_die("Don't know this action, %s", get_name().c_str());
468   }
469
470   TRACE_smpi_comm_out(get_pid());
471 }
472
473 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
474 {
475   const SendRecvParser& args = get_args();
476   TRACE_smpi_comm_in(
477       get_pid(), __func__,
478       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
479
480   MPI_Status status;
481   // unknown size from the receiver point of view
482   size_t arg_size = args.size;
483   if (arg_size == 0) {
484     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
485     arg_size = status.count;
486   }
487
488   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
489   if (get_name() == "recv") {
490     is_recv = true;
491     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
492   } else if (get_name() == "irecv") {
493     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
494     req_storage.add(request);
495   } else {
496     THROW_IMPOSSIBLE;
497   }
498
499   TRACE_smpi_comm_out(get_pid());
500   if (is_recv && not TRACE_smpi_view_internals()) {
501     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
502     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
503   }
504 }
505
506 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   const ComputeParser& args = get_args();
509   if (smpi_cfg_simulate_computation()) {
510     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
511   }
512 }
513
514 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
515 {
516   const SleepParser& args = get_args();
517   XBT_DEBUG("Sleep for: %lf secs", args.time);
518   aid_t pid = simgrid::s4u::this_actor::get_pid();
519   TRACE_smpi_sleeping_in(pid, args.time);
520   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
521   TRACE_smpi_sleeping_out(pid);
522 }
523
524 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
525 {
526   const LocationParser& args = get_args();
527   smpi_trace_set_call_location(args.filename.c_str(), args.line);
528 }
529
530 void TestAction::kernel(simgrid::xbt::ReplayAction&)
531 {
532   const WaitTestParser& args = get_args();
533   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
534   // if request is null here, this may mean that a previous test has succeeded
535   // Different times in traced application and replayed version may lead to this
536   // In this case, ignore the extra calls.
537   if (request != MPI_REQUEST_NULL) {
538     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
539
540     MPI_Status status;
541     int flag = 0;
542     Request::test(&request, &status, &flag);
543
544     XBT_DEBUG("MPI_Test result: %d", flag);
545     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
546      * nullptr.*/
547     if (request == MPI_REQUEST_NULL)
548       req_storage.addNullRequest(args.src, args.dst, args.tag);
549     else
550       req_storage.add(request);
551
552     TRACE_smpi_comm_out(get_pid());
553   }
554 }
555
556 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
557 {
558   CHECK_ACTION_PARAMS(action, 0, 1)
559     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
560     : MPI_BYTE;  // default TAU datatype
561
562   /* start a simulated timer */
563   smpi_process()->simulated_start();
564 }
565
566 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
567 {
568   /* nothing to do */
569 }
570
571 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
572 {
573   if (req_storage.size() > 0) {
574     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
575     std::vector<MPI_Request> reqs;
576     req_storage.get_requests(reqs);
577     unsigned long count_requests = reqs.size();
578     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
579     for (auto const& req : reqs) {
580       if (req && (req->flags() & MPI_REQ_RECV)) {
581         sender_receiver.emplace_back(req->src(), req->dst());
582       }
583     }
584     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
585     req_storage.get_store().clear();
586
587     for (MPI_Request& req : reqs)
588       if (req != MPI_REQUEST_NULL)
589         Request::unref(&req);
590
591     for (auto const& pair : sender_receiver) {
592       TRACE_smpi_recv(pair.first, pair.second, 0);
593     }
594     TRACE_smpi_comm_out(get_pid());
595   }
596 }
597
598 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
599 {
600   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
601   colls::barrier(MPI_COMM_WORLD);
602   TRACE_smpi_comm_out(get_pid());
603 }
604
605 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
606 {
607   const BcastArgParser& args = get_args();
608   TRACE_smpi_comm_in(get_pid(), "action_bcast",
609                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
610                                                     0, Datatype::encode(args.datatype1), ""));
611
612   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
613
614   TRACE_smpi_comm_out(get_pid());
615 }
616
617 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
618 {
619   const ReduceArgParser& args = get_args();
620   TRACE_smpi_comm_in(get_pid(), "action_reduce",
621                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
622                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
623
624   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
625                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
626                 args.root, MPI_COMM_WORLD);
627   if (args.comp_size != 0.0)
628     simgrid::s4u::this_actor::exec_init(args.comp_size)
629       ->set_name("computation")
630       ->start()
631       ->wait();
632
633   TRACE_smpi_comm_out(get_pid());
634 }
635
636 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
637 {
638   const AllReduceArgParser& args = get_args();
639   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
640                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
641                                                     Datatype::encode(args.datatype1), ""));
642
643   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
644                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
645                    MPI_COMM_WORLD);
646   if (args.comp_size != 0.0)
647     simgrid::s4u::this_actor::exec_init(args.comp_size)
648       ->set_name("computation")
649       ->start()
650       ->wait();
651
652   TRACE_smpi_comm_out(get_pid());
653 }
654
655 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
656 {
657   const AllToAllArgParser& args = get_args();
658   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
659                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
660                                                     Datatype::encode(args.datatype1),
661                                                     Datatype::encode(args.datatype2)));
662
663   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
664                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
665                   MPI_COMM_WORLD);
666
667   TRACE_smpi_comm_out(get_pid());
668 }
669
670 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
671 {
672   const GatherArgParser& args = get_args();
673   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
674                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
675                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
676                                                     Datatype::encode(args.datatype2)));
677
678   if (get_name() == "gather") {
679     int rank = MPI_COMM_WORLD->rank();
680     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
682                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
683   } else
684     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
685                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
686                      MPI_COMM_WORLD);
687
688   TRACE_smpi_comm_out(get_pid());
689 }
690
691 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
692 {
693   int rank = MPI_COMM_WORLD->rank();
694   const GatherVArgParser& args = get_args();
695   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
696                      new simgrid::instr::VarCollTIData(
697                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
698                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
699
700   if (get_name() == "gatherv") {
701     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
703                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
704   } else {
705     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
706                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
707                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
708   }
709
710   TRACE_smpi_comm_out(get_pid());
711 }
712
713 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
714 {
715   int rank = MPI_COMM_WORLD->rank();
716   const ScatterArgParser& args = get_args();
717   TRACE_smpi_comm_in(get_pid(), "action_scatter",
718                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
719                                                     Datatype::encode(args.datatype1),
720                                                     Datatype::encode(args.datatype2)));
721
722   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
723                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
724                  args.datatype2, args.root, MPI_COMM_WORLD);
725
726   TRACE_smpi_comm_out(get_pid());
727 }
728
729 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
730 {
731   int rank = MPI_COMM_WORLD->rank();
732   const ScatterVArgParser& args = get_args();
733   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
734                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
735                                                        nullptr, Datatype::encode(args.datatype1),
736                                                        Datatype::encode(args.datatype2)));
737
738   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
739                   args.sendcounts->data(), args.disps.data(), args.datatype1,
740                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
741                   MPI_COMM_WORLD);
742
743   TRACE_smpi_comm_out(get_pid());
744 }
745
746 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
747 {
748   const ReduceScatterArgParser& args = get_args();
749   TRACE_smpi_comm_in(
750       get_pid(), "action_reducescatter",
751       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
752                                         /* ugly as we use datatype field to pass computation as string */
753                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
754                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
755                                         Datatype::encode(args.datatype1)));
756
757   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
758                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
759                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
760   if (args.comp_size != 0.0)
761     simgrid::s4u::this_actor::exec_init(args.comp_size)
762       ->set_name("computation")
763       ->start()
764       ->wait();
765   TRACE_smpi_comm_out(get_pid());
766 }
767
768 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
769 {
770   const ScanArgParser& args = get_args();
771   TRACE_smpi_comm_in(get_pid(), "action_scan",
772                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
773                      args.size, 0, Datatype::encode(args.datatype1), ""));
774   if (get_name() == "scan")
775     colls::scan(send_buffer(args.size * args.datatype1->size()),
776               recv_buffer(args.size * args.datatype1->size()), args.size,
777               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
778   else
779     colls::exscan(send_buffer(args.size * args.datatype1->size()),
780               recv_buffer(args.size * args.datatype1->size()), args.size,
781               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
782
783   if (args.comp_size != 0.0)
784     simgrid::s4u::this_actor::exec_init(args.comp_size)
785       ->set_name("computation")
786       ->start()
787       ->wait();
788   TRACE_smpi_comm_out(get_pid());
789 }
790
791 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
792 {
793   const AllToAllVArgParser& args = get_args();
794   TRACE_smpi_comm_in(get_pid(), __func__,
795                      new simgrid::instr::VarCollTIData(
796                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
797                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
798
799   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
800                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
801                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
802
803   TRACE_smpi_comm_out(get_pid());
804 }
805 } // Replay Namespace
806 }} // namespace simgrid::smpi
807
808 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
809 /** @brief Only initialize the replay, don't do it for real */
810 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
811 {
812   xbt_assert(not smpi_process()->initializing());
813
814   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
815   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
816   simgrid::smpi::ActorExt::init();
817
818   smpi_process()->mark_as_initialized();
819   smpi_process()->set_replaying(true);
820
821   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
822   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
823   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
824   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
827   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
830   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
831   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
832   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
835   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
836   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
837   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
838   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
839   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
840   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
841   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
842   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
843   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
844   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
845   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
846   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
847   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
848   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
849   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
850   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
851   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
852
853   //if we have a delayed start, sleep here.
854   if (start_delay_flops > 0) {
855     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
856     private_execute_flops(start_delay_flops);
857   } else {
858     // Wait for the other actors to initialize also
859     simgrid::s4u::this_actor::yield();
860   }
861   if(_smpi_init_sleep > 0)
862     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
863 }
864
865 /** @brief actually run the replay after initialization */
866 void smpi_replay_main(int rank, const char* private_trace_filename)
867 {
868   static int active_processes = 0;
869   active_processes++;
870   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
871   std::string rank_string                      = std::to_string(rank);
872   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
873
874   /* and now, finalize everything */
875   /* One active process will stop. Decrease the counter*/
876   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
877   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
878   if (count_requests > 0) {
879     std::vector<MPI_Request> requests(count_requests);
880     unsigned int i=0;
881
882     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
883       for (auto& req: pair.second){
884         requests[i] = req;
885       }
886       i++;
887     }
888     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
889   }
890
891   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
892     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
893
894   active_processes--;
895
896   if(active_processes==0){
897     /* Last process alive speaking: end the simulated timer */
898     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
899     smpi_free_replay_tmp_buffers();
900   }
901
902   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
903                      new simgrid::instr::NoOpTIData("finalize"));
904
905   smpi_process()->finalize();
906
907   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
908 }
909
910 /** @brief chain a replay initialization and a replay start */
911 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
912 {
913   smpi_replay_init(instance_id, rank, start_delay_flops);
914   smpi_replay_main(rank, private_trace_filename);
915 }