Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Structured binding here too.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid {
101 namespace smpi {
102
103 namespace replay {
104 MPI_Datatype MPI_DEFAULT_TYPE;
105
106 class RequestStorage {
107 private:
108   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
110
111   req_storage_t store;
112
113 public:
114   RequestStorage() = default;
115   size_t size() const { return store.size(); }
116
117   req_storage_t& get_store() { return store; }
118
119   void get_requests(std::vector<MPI_Request>& vec) const
120   {
121     for (auto const& [_, reqs] : store) {
122       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123       for (auto& req: reqs){
124         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
125           vec.push_back(req);
126           req->print_request("MM");
127         }
128       }
129     }
130   }
131
132   MPI_Request pop(int src, int dst, int tag)
133   {
134     auto it = store.find(req_key_t(src, dst, tag));
135     if (it == store.end())
136      return MPI_REQUEST_NULL;
137     MPI_Request req = it->second.front();
138     it->second.pop_front();
139     if(it->second.empty())
140       store.erase(req_key_t(src, dst, tag));
141     return req;
142   }
143
144   void add(MPI_Request req)
145   {
146     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
147       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
148     }
149   }
150
151   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
152   void addNullRequest(int src, int dst, int tag)
153   {
154     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
155     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
156     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
157   }
158 };
159
160 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
161 {
162   CHECK_ACTION_PARAMS(action, 3, 0)
163   src = std::stoi(action[2]);
164   dst = std::stoi(action[3]);
165   tag = std::stoi(action[4]);
166 }
167
168 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 3, 1)
171   partner = std::stoi(action[2]);
172   tag     = std::stoi(action[3]);
173   size      = parse_integer<size_t>(action[4]);
174   datatype1 = parse_datatype(action, 5);
175 }
176
177 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
178 {
179   CHECK_ACTION_PARAMS(action, 1, 0)
180   flops = parse_double(action[2]);
181 }
182
183 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 {
185   CHECK_ACTION_PARAMS(action, 1, 0)
186   time = parse_double(action[2]);
187 }
188
189 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
190 {
191   CHECK_ACTION_PARAMS(action, 2, 0)
192   filename = std::string(action[2]);
193   line = std::stoi(action[3]);
194 }
195
196 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 2)
199   size      = parse_integer<size_t>(action[2]);
200   root      = parse_root(action, 3);
201   datatype1 = parse_datatype(action, 4);
202 }
203
204 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 {
206   CHECK_ACTION_PARAMS(action, 2, 2)
207   comm_size = parse_integer<unsigned>(action[2]);
208   comp_size = parse_double(action[3]);
209   root      = parse_root(action, 4);
210   datatype1 = parse_datatype(action, 5);
211 }
212
213 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
214 {
215   CHECK_ACTION_PARAMS(action, 2, 1)
216   comm_size = parse_integer<unsigned>(action[2]);
217   comp_size = parse_double(action[3]);
218   datatype1 = parse_datatype(action, 4);
219 }
220
221 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
222 {
223   CHECK_ACTION_PARAMS(action, 2, 1)
224   comm_size = MPI_COMM_WORLD->size();
225   send_size = parse_integer<int>(action[2]);
226   recv_size = parse_integer<int>(action[3]);
227   datatype1 = parse_datatype(action, 4);
228   datatype2 = parse_datatype(action, 5);
229 }
230
231 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
232 {
233   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
234         0 gather 68 68 0 0 0
235       where:
236         1) 68 is the sendcounts
237         2) 68 is the recvcounts
238         3) 0 is the root node
239         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
240         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
241   */
242   CHECK_ACTION_PARAMS(action, 2, 3)
243   comm_size = MPI_COMM_WORLD->size();
244   send_size = parse_integer<int>(action[2]);
245   recv_size = parse_integer<int>(action[3]);
246
247   if (name == "gather") {
248     root      = parse_root(action, 4);
249     datatype1 = parse_datatype(action, 5);
250     datatype2 = parse_datatype(action, 6);
251   } else {
252     root      = 0;
253     datatype1 = parse_datatype(action, 4);
254     datatype2 = parse_datatype(action, 5);
255   }
256 }
257
258 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
259 {
260   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
261        0 gather 68 68 10 10 10 0 0 0
262      where:
263        1) 68 is the sendcount
264        2) 68 10 10 10 is the recvcounts
265        3) 0 is the root node
266        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
267        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
268   */
269   comm_size = MPI_COMM_WORLD->size();
270   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
271   send_size  = parse_integer<int>(action[2]);
272   disps      = std::vector<int>(comm_size, 0);
273   recvcounts = std::make_shared<std::vector<int>>(comm_size);
274
275   if (name == "gatherv") {
276     root      = parse_root(action, 3 + comm_size);
277     datatype1 = parse_datatype(action, 4 + comm_size);
278     datatype2 = parse_datatype(action, 5 + comm_size);
279   } else {
280     root                = 0;
281     unsigned disp_index = 0;
282     /* The 3 comes from "0 gather <sendcount>", which must always be present.
283      * The + comm_size is the recvcounts array, which must also be present
284      */
285     if (action.size() > 3 + comm_size + comm_size) {
286       // datatype + disp are specified
287       datatype1  = parse_datatype(action, 3 + comm_size);
288       datatype2  = parse_datatype(action, 4 + comm_size);
289       disp_index = 5 + comm_size;
290     } else if (action.size() > 3 + comm_size + 2) {
291       // disps specified; datatype is not specified; use the default one
292       datatype1  = MPI_DEFAULT_TYPE;
293       datatype2  = MPI_DEFAULT_TYPE;
294       disp_index = 3 + comm_size;
295     } else {
296       // no disp specified, maybe only datatype,
297       datatype1 = parse_datatype(action, 3 + comm_size);
298       datatype2 = parse_datatype(action, 4 + comm_size);
299     }
300
301     if (disp_index != 0) {
302       xbt_assert(disp_index + comm_size <= action.size());
303       for (unsigned i = 0; i < comm_size; i++)
304         disps[i]          = std::stoi(action[disp_index + i]);
305     }
306   }
307
308   for (unsigned int i = 0; i < comm_size; i++) {
309     (*recvcounts)[i] = std::stoi(action[i + 3]);
310   }
311   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
312 }
313
314 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
315 {
316   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
317         0 gather 68 68 0 0 0
318       where:
319         1) 68 is the sendcounts
320         2) 68 is the recvcounts
321         3) 0 is the root node
322         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
323         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
324   */
325   comm_size = MPI_COMM_WORLD->size();
326   CHECK_ACTION_PARAMS(action, 2, 3)
327   comm_size = MPI_COMM_WORLD->size();
328   send_size = parse_integer<int>(action[2]);
329   recv_size = parse_integer<int>(action[3]);
330   root      = parse_root(action, 4);
331   datatype1 = parse_datatype(action, 5);
332   datatype2 = parse_datatype(action, 6);
333 }
334
335 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
336 {
337   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
338      0 gather 68 10 10 10 68 0 0 0
339       where:
340       1) 68 10 10 10 is the sendcounts
341       2) 68 is the recvcount
342       3) 0 is the root node
343       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
344       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
345   */
346   comm_size = MPI_COMM_WORLD->size();
347   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
348   recv_size  = parse_integer<int>(action[2 + comm_size]);
349   disps      = std::vector<int>(comm_size, 0);
350   sendcounts = std::make_shared<std::vector<int>>(comm_size);
351
352   root      = parse_root(action, 3 + comm_size);
353   datatype1 = parse_datatype(action, 4 + comm_size);
354   datatype2 = parse_datatype(action, 5 + comm_size);
355
356   for (unsigned int i = 0; i < comm_size; i++) {
357     (*sendcounts)[i] = std::stoi(action[i + 2]);
358   }
359   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
360 }
361
362 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
363 {
364   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
365        0 reducescatter 275427 275427 275427 204020 11346849 0
366      where:
367        1) The first four values after the name of the action declare the recvcounts array
368        2) The value 11346849 is the amount of instructions
369        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
370   */
371   comm_size = MPI_COMM_WORLD->size();
372   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
373   comp_size  = parse_double(action[2 + comm_size]);
374   recvcounts = std::make_shared<std::vector<int>>(comm_size);
375   datatype1  = parse_datatype(action, 3 + comm_size);
376
377   for (unsigned int i = 0; i < comm_size; i++) {
378     (*recvcounts)[i]= std::stoi(action[i + 2]);
379   }
380   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
381 }
382
383 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
384 {
385   CHECK_ACTION_PARAMS(action, 2, 1)
386   size      = parse_integer<size_t>(action[2]);
387   comp_size = parse_double(action[3]);
388   datatype1 = parse_datatype(action, 4);
389 }
390
391 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
392 {
393   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
394         0 alltoallv 100 1 7 10 12 100 1 70 10 5
395      where:
396       1) 100 is the size of the send buffer *sizeof(int),
397       2) 1 7 10 12 is the sendcounts array
398       3) 100*sizeof(int) is the size of the receiver buffer
399       4)  1 70 10 5 is the recvcounts array
400   */
401   comm_size = MPI_COMM_WORLD->size();
402   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
403   sendcounts = std::make_shared<std::vector<int>>(comm_size);
404   recvcounts = std::make_shared<std::vector<int>>(comm_size);
405   senddisps  = std::vector<int>(comm_size, 0);
406   recvdisps  = std::vector<int>(comm_size, 0);
407
408   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
409   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
410
411   send_buf_size = parse_integer<int>(action[2]);
412   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
413   for (unsigned int i = 0; i < comm_size; i++) {
414     (*sendcounts)[i] = std::stoi(action[3 + i]);
415     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
416   }
417   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
418   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
419 }
420
421 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
422 {
423   std::string s = boost::algorithm::join(action, " ");
424   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
425   const WaitTestParser& args = get_args();
426   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
427
428   if (request == MPI_REQUEST_NULL) {
429     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
430      * return.*/
431     return;
432   }
433
434   // Must be taken before Request::wait() since the request may be set to
435   // MPI_REQUEST_NULL by Request::wait!
436   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
437
438   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
439
440   MPI_Status status;
441   Request::wait(&request, &status);
442   if(request!=MPI_REQUEST_NULL)
443     Request::unref(&request);
444   TRACE_smpi_comm_out(get_pid());
445   if (is_wait_for_receive)
446     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
447 }
448
449 void SendAction::kernel(simgrid::xbt::ReplayAction&)
450 {
451   const SendRecvParser& args = get_args();
452   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
453
454   TRACE_smpi_comm_in(
455       get_pid(), __func__,
456       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
457   if (not TRACE_smpi_view_internals())
458     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
459
460   if (get_name() == "send") {
461     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
462   } else if (get_name() == "isend") {
463     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
464     req_storage.add(request);
465   } else {
466     xbt_die("Don't know this action, %s", get_name().c_str());
467   }
468
469   TRACE_smpi_comm_out(get_pid());
470 }
471
472 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
473 {
474   const SendRecvParser& args = get_args();
475   TRACE_smpi_comm_in(
476       get_pid(), __func__,
477       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
478
479   MPI_Status status;
480   // unknown size from the receiver point of view
481   size_t arg_size = args.size;
482   if (arg_size == 0) {
483     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
484     arg_size = status.count;
485   }
486
487   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
488   if (get_name() == "recv") {
489     is_recv = true;
490     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
491   } else if (get_name() == "irecv") {
492     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
493     req_storage.add(request);
494   } else {
495     THROW_IMPOSSIBLE;
496   }
497
498   TRACE_smpi_comm_out(get_pid());
499   if (is_recv && not TRACE_smpi_view_internals()) {
500     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
501     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
502   }
503 }
504
505 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
506 {
507   const ComputeParser& args = get_args();
508   if (smpi_cfg_simulate_computation()) {
509     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
510   }
511 }
512
513 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
514 {
515   const SleepParser& args = get_args();
516   XBT_DEBUG("Sleep for: %lf secs", args.time);
517   aid_t pid = simgrid::s4u::this_actor::get_pid();
518   TRACE_smpi_sleeping_in(pid, args.time);
519   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
520   TRACE_smpi_sleeping_out(pid);
521 }
522
523 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
524 {
525   const LocationParser& args = get_args();
526   smpi_trace_set_call_location(args.filename.c_str(), args.line);
527 }
528
529 void TestAction::kernel(simgrid::xbt::ReplayAction&)
530 {
531   const WaitTestParser& args = get_args();
532   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
533   // if request is null here, this may mean that a previous test has succeeded
534   // Different times in traced application and replayed version may lead to this
535   // In this case, ignore the extra calls.
536   if (request != MPI_REQUEST_NULL) {
537     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
538
539     MPI_Status status;
540     int flag = 0;
541     Request::test(&request, &status, &flag);
542
543     XBT_DEBUG("MPI_Test result: %d", flag);
544     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
545      * nullptr.*/
546     if (request == MPI_REQUEST_NULL)
547       req_storage.addNullRequest(args.src, args.dst, args.tag);
548     else
549       req_storage.add(request);
550
551     TRACE_smpi_comm_out(get_pid());
552   }
553 }
554
555 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
556 {
557   CHECK_ACTION_PARAMS(action, 0, 1)
558     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
559     : MPI_BYTE;  // default TAU datatype
560
561   /* start a simulated timer */
562   smpi_process()->simulated_start();
563 }
564
565 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
566 {
567   /* nothing to do */
568 }
569
570 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
571 {
572   if (req_storage.size() > 0) {
573     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
574     std::vector<MPI_Request> reqs;
575     req_storage.get_requests(reqs);
576     unsigned long count_requests = reqs.size();
577     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
578     for (auto const& req : reqs) {
579       if (req && (req->flags() & MPI_REQ_RECV)) {
580         sender_receiver.emplace_back(req->src(), req->dst());
581       }
582     }
583     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
584     req_storage.get_store().clear();
585
586     for (MPI_Request& req : reqs)
587       if (req != MPI_REQUEST_NULL)
588         Request::unref(&req);
589
590     for (auto const& [src, dst] : sender_receiver) {
591       TRACE_smpi_recv(src, dst, 0);
592     }
593     TRACE_smpi_comm_out(get_pid());
594   }
595 }
596
597 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
598 {
599   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
600   colls::barrier(MPI_COMM_WORLD);
601   TRACE_smpi_comm_out(get_pid());
602 }
603
604 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
605 {
606   const BcastArgParser& args = get_args();
607   TRACE_smpi_comm_in(get_pid(), "action_bcast",
608                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
609                                                     0, Datatype::encode(args.datatype1), ""));
610
611   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
612
613   TRACE_smpi_comm_out(get_pid());
614 }
615
616 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
617 {
618   const ReduceArgParser& args = get_args();
619   TRACE_smpi_comm_in(get_pid(), "action_reduce",
620                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
621                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
622
623   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
624                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
625                 args.root, MPI_COMM_WORLD);
626   if (args.comp_size != 0.0)
627     simgrid::s4u::this_actor::exec_init(args.comp_size)
628       ->set_name("computation")
629       ->start()
630       ->wait();
631
632   TRACE_smpi_comm_out(get_pid());
633 }
634
635 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
636 {
637   const AllReduceArgParser& args = get_args();
638   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
639                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
640                                                     Datatype::encode(args.datatype1), ""));
641
642   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
643                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
644                    MPI_COMM_WORLD);
645   if (args.comp_size != 0.0)
646     simgrid::s4u::this_actor::exec_init(args.comp_size)
647       ->set_name("computation")
648       ->start()
649       ->wait();
650
651   TRACE_smpi_comm_out(get_pid());
652 }
653
654 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
655 {
656   const AllToAllArgParser& args = get_args();
657   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
658                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
659                                                     Datatype::encode(args.datatype1),
660                                                     Datatype::encode(args.datatype2)));
661
662   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
663                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
664                   MPI_COMM_WORLD);
665
666   TRACE_smpi_comm_out(get_pid());
667 }
668
669 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
670 {
671   const GatherArgParser& args = get_args();
672   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
674                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
675                                                     Datatype::encode(args.datatype2)));
676
677   if (get_name() == "gather") {
678     int rank = MPI_COMM_WORLD->rank();
679     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
681                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
682   } else
683     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
684                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
685                      MPI_COMM_WORLD);
686
687   TRACE_smpi_comm_out(get_pid());
688 }
689
690 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
691 {
692   int rank = MPI_COMM_WORLD->rank();
693   const GatherVArgParser& args = get_args();
694   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
695                      new simgrid::instr::VarCollTIData(
696                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
697                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
698
699   if (get_name() == "gatherv") {
700     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
702                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
703   } else {
704     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
705                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
706                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
707   }
708
709   TRACE_smpi_comm_out(get_pid());
710 }
711
712 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
713 {
714   int rank = MPI_COMM_WORLD->rank();
715   const ScatterArgParser& args = get_args();
716   TRACE_smpi_comm_in(get_pid(), "action_scatter",
717                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
718                                                     Datatype::encode(args.datatype1),
719                                                     Datatype::encode(args.datatype2)));
720
721   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
722                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
723                  args.datatype2, args.root, MPI_COMM_WORLD);
724
725   TRACE_smpi_comm_out(get_pid());
726 }
727
728 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
729 {
730   int rank = MPI_COMM_WORLD->rank();
731   const ScatterVArgParser& args = get_args();
732   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
733                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
734                                                        nullptr, Datatype::encode(args.datatype1),
735                                                        Datatype::encode(args.datatype2)));
736
737   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
738                   args.sendcounts->data(), args.disps.data(), args.datatype1,
739                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
740                   MPI_COMM_WORLD);
741
742   TRACE_smpi_comm_out(get_pid());
743 }
744
745 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
746 {
747   const ReduceScatterArgParser& args = get_args();
748   TRACE_smpi_comm_in(
749       get_pid(), "action_reducescatter",
750       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
751                                         /* ugly as we use datatype field to pass computation as string */
752                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
753                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
754                                         Datatype::encode(args.datatype1)));
755
756   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
757                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
758                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
759   if (args.comp_size != 0.0)
760     simgrid::s4u::this_actor::exec_init(args.comp_size)
761       ->set_name("computation")
762       ->start()
763       ->wait();
764   TRACE_smpi_comm_out(get_pid());
765 }
766
767 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
768 {
769   const ScanArgParser& args = get_args();
770   TRACE_smpi_comm_in(get_pid(), "action_scan",
771                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
772                      args.size, 0, Datatype::encode(args.datatype1), ""));
773   if (get_name() == "scan")
774     colls::scan(send_buffer(args.size * args.datatype1->size()),
775               recv_buffer(args.size * args.datatype1->size()), args.size,
776               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
777   else
778     colls::exscan(send_buffer(args.size * args.datatype1->size()),
779               recv_buffer(args.size * args.datatype1->size()), args.size,
780               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
781
782   if (args.comp_size != 0.0)
783     simgrid::s4u::this_actor::exec_init(args.comp_size)
784       ->set_name("computation")
785       ->start()
786       ->wait();
787   TRACE_smpi_comm_out(get_pid());
788 }
789
790 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
791 {
792   const AllToAllVArgParser& args = get_args();
793   TRACE_smpi_comm_in(get_pid(), __func__,
794                      new simgrid::instr::VarCollTIData(
795                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
796                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
797
798   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
799                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
800                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
801
802   TRACE_smpi_comm_out(get_pid());
803 }
804 } // Replay Namespace
805 }} // namespace simgrid::smpi
806
807 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
808 /** @brief Only initialize the replay, don't do it for real */
809 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
810 {
811   xbt_assert(not smpi_process()->initializing());
812
813   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
814   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
815   simgrid::smpi::ActorExt::init();
816
817   smpi_process()->mark_as_initialized();
818   smpi_process()->set_replaying(true);
819
820   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
821   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
822   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
823   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
825   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
830   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
831   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
832   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
834   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
835   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
836   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
837   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
838   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
839   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
840   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
841   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
842   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
843   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
844   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
845   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
846   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
847   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
848   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
849   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
850   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
851
852   //if we have a delayed start, sleep here.
853   if (start_delay_flops > 0) {
854     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
855     private_execute_flops(start_delay_flops);
856   } else {
857     // Wait for the other actors to initialize also
858     simgrid::s4u::this_actor::yield();
859   }
860   if(_smpi_init_sleep > 0)
861     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
862 }
863
864 /** @brief actually run the replay after initialization */
865 void smpi_replay_main(int rank, const char* private_trace_filename)
866 {
867   static int active_processes = 0;
868   active_processes++;
869   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
870   std::string rank_string                      = std::to_string(rank);
871   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
872
873   /* and now, finalize everything */
874   /* One active process will stop. Decrease the counter*/
875   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
876   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
877   if (count_requests > 0) {
878     std::vector<MPI_Request> requests(count_requests);
879     unsigned int i=0;
880
881     for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
882       for (auto& req : reqs) {
883         requests[i] = req; // FIXME: overwritten at each iteration?
884       }
885       i++;
886     }
887     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
888   }
889
890   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
891     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
892
893   active_processes--;
894
895   if(active_processes==0){
896     /* Last process alive speaking: end the simulated timer */
897     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
898     smpi_free_replay_tmp_buffers();
899   }
900
901   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
902                      new simgrid::instr::NoOpTIData("finalize"));
903
904   smpi_process()->finalize();
905
906   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
907 }
908
909 /** @brief chain a replay initialization and a replay start */
910 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
911 {
912   smpi_replay_init(instance_id, rank, start_delay_flops);
913   smpi_replay_main(rank, private_trace_filename);
914 }