Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Concatenate nested namespaces (sonar).
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
102
103 class RequestStorage {
104 private:
105   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
107
108   req_storage_t store;
109
110 public:
111   RequestStorage() = default;
112   size_t size() const { return store.size(); }
113
114   req_storage_t& get_store() { return store; }
115
116   void get_requests(std::vector<MPI_Request>& vec) const
117   {
118     for (auto const& [_, reqs] : store) {
119       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120       for (auto& req: reqs){
121         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
122           vec.push_back(req);
123           req->print_request("MM");
124         }
125       }
126     }
127   }
128
129   MPI_Request pop(int src, int dst, int tag)
130   {
131     auto it = store.find(req_key_t(src, dst, tag));
132     if (it == store.end())
133      return MPI_REQUEST_NULL;
134     MPI_Request req = it->second.front();
135     it->second.pop_front();
136     if(it->second.empty())
137       store.erase(req_key_t(src, dst, tag));
138     return req;
139   }
140
141   void add(MPI_Request req)
142   {
143     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
145     }
146   }
147
148   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149   void addNullRequest(int src, int dst, int tag)
150   {
151     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
154   }
155 };
156
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 {
159   CHECK_ACTION_PARAMS(action, 3, 0)
160   src = std::stoi(action[2]);
161   dst = std::stoi(action[3]);
162   tag = std::stoi(action[4]);
163 }
164
165 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 {
167   CHECK_ACTION_PARAMS(action, 3, 1)
168   partner = std::stoi(action[2]);
169   tag     = std::stoi(action[3]);
170   size      = parse_integer<size_t>(action[4]);
171   datatype1 = parse_datatype(action, 5);
172 }
173
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 0)
177   flops = parse_double(action[2]);
178 }
179
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 {
182   CHECK_ACTION_PARAMS(action, 1, 0)
183   time = parse_double(action[2]);
184 }
185
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 0)
189   filename = std::string(action[2]);
190   line = std::stoi(action[3]);
191 }
192
193 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 {
195   CHECK_ACTION_PARAMS(action, 1, 2)
196   size      = parse_integer<size_t>(action[2]);
197   root      = parse_root(action, 3);
198   datatype1 = parse_datatype(action, 4);
199 }
200
201 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 {
203   CHECK_ACTION_PARAMS(action, 2, 2)
204   comm_size = parse_integer<unsigned>(action[2]);
205   comp_size = parse_double(action[3]);
206   root      = parse_root(action, 4);
207   datatype1 = parse_datatype(action, 5);
208 }
209
210 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   comm_size = parse_integer<unsigned>(action[2]);
214   comp_size = parse_double(action[3]);
215   datatype1 = parse_datatype(action, 4);
216 }
217
218 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
219 {
220   CHECK_ACTION_PARAMS(action, 2, 1)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_integer<int>(action[2]);
223   recv_size = parse_integer<int>(action[3]);
224   datatype1 = parse_datatype(action, 4);
225   datatype2 = parse_datatype(action, 5);
226 }
227
228 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
229 {
230   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
231         0 gather 68 68 0 0 0
232       where:
233         1) 68 is the sendcounts
234         2) 68 is the recvcounts
235         3) 0 is the root node
236         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
237         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
238   */
239   CHECK_ACTION_PARAMS(action, 2, 3)
240   comm_size = MPI_COMM_WORLD->size();
241   send_size = parse_integer<int>(action[2]);
242   recv_size = parse_integer<int>(action[3]);
243
244   if (name == "gather") {
245     root      = parse_root(action, 4);
246     datatype1 = parse_datatype(action, 5);
247     datatype2 = parse_datatype(action, 6);
248   } else {
249     root      = 0;
250     datatype1 = parse_datatype(action, 4);
251     datatype2 = parse_datatype(action, 5);
252   }
253 }
254
255 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
256 {
257   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
258        0 gather 68 68 10 10 10 0 0 0
259      where:
260        1) 68 is the sendcount
261        2) 68 10 10 10 is the recvcounts
262        3) 0 is the root node
263        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
264        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
265   */
266   comm_size = MPI_COMM_WORLD->size();
267   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
268   send_size  = parse_integer<int>(action[2]);
269   disps      = std::vector<int>(comm_size, 0);
270   recvcounts = std::make_shared<std::vector<int>>(comm_size);
271
272   if (name == "gatherv") {
273     root      = parse_root(action, 3 + comm_size);
274     datatype1 = parse_datatype(action, 4 + comm_size);
275     datatype2 = parse_datatype(action, 5 + comm_size);
276   } else {
277     root                = 0;
278     unsigned disp_index = 0;
279     /* The 3 comes from "0 gather <sendcount>", which must always be present.
280      * The + comm_size is the recvcounts array, which must also be present
281      */
282     if (action.size() > 3 + comm_size + comm_size) {
283       // datatype + disp are specified
284       datatype1  = parse_datatype(action, 3 + comm_size);
285       datatype2  = parse_datatype(action, 4 + comm_size);
286       disp_index = 5 + comm_size;
287     } else if (action.size() > 3 + comm_size + 2) {
288       // disps specified; datatype is not specified; use the default one
289       datatype1  = MPI_DEFAULT_TYPE;
290       datatype2  = MPI_DEFAULT_TYPE;
291       disp_index = 3 + comm_size;
292     } else {
293       // no disp specified, maybe only datatype,
294       datatype1 = parse_datatype(action, 3 + comm_size);
295       datatype2 = parse_datatype(action, 4 + comm_size);
296     }
297
298     if (disp_index != 0) {
299       xbt_assert(disp_index + comm_size <= action.size());
300       for (unsigned i = 0; i < comm_size; i++)
301         disps[i]          = std::stoi(action[disp_index + i]);
302     }
303   }
304
305   for (unsigned int i = 0; i < comm_size; i++) {
306     (*recvcounts)[i] = std::stoi(action[i + 3]);
307   }
308   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
309 }
310
311 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
312 {
313   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
314         0 gather 68 68 0 0 0
315       where:
316         1) 68 is the sendcounts
317         2) 68 is the recvcounts
318         3) 0 is the root node
319         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
320         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
321   */
322   comm_size = MPI_COMM_WORLD->size();
323   CHECK_ACTION_PARAMS(action, 2, 3)
324   comm_size = MPI_COMM_WORLD->size();
325   send_size = parse_integer<int>(action[2]);
326   recv_size = parse_integer<int>(action[3]);
327   root      = parse_root(action, 4);
328   datatype1 = parse_datatype(action, 5);
329   datatype2 = parse_datatype(action, 6);
330 }
331
332 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
333 {
334   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
335      0 gather 68 10 10 10 68 0 0 0
336       where:
337       1) 68 10 10 10 is the sendcounts
338       2) 68 is the recvcount
339       3) 0 is the root node
340       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
341       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
342   */
343   comm_size = MPI_COMM_WORLD->size();
344   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
345   recv_size  = parse_integer<int>(action[2 + comm_size]);
346   disps      = std::vector<int>(comm_size, 0);
347   sendcounts = std::make_shared<std::vector<int>>(comm_size);
348
349   root      = parse_root(action, 3 + comm_size);
350   datatype1 = parse_datatype(action, 4 + comm_size);
351   datatype2 = parse_datatype(action, 5 + comm_size);
352
353   for (unsigned int i = 0; i < comm_size; i++) {
354     (*sendcounts)[i] = std::stoi(action[i + 2]);
355   }
356   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
357 }
358
359 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
360 {
361   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
362        0 reducescatter 275427 275427 275427 204020 11346849 0
363      where:
364        1) The first four values after the name of the action declare the recvcounts array
365        2) The value 11346849 is the amount of instructions
366        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
367   */
368   comm_size = MPI_COMM_WORLD->size();
369   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
370   comp_size  = parse_double(action[2 + comm_size]);
371   recvcounts = std::make_shared<std::vector<int>>(comm_size);
372   datatype1  = parse_datatype(action, 3 + comm_size);
373
374   for (unsigned int i = 0; i < comm_size; i++) {
375     (*recvcounts)[i]= std::stoi(action[i + 2]);
376   }
377   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
378 }
379
380 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
381 {
382   CHECK_ACTION_PARAMS(action, 2, 1)
383   size      = parse_integer<size_t>(action[2]);
384   comp_size = parse_double(action[3]);
385   datatype1 = parse_datatype(action, 4);
386 }
387
388 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
389 {
390   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
391         0 alltoallv 100 1 7 10 12 100 1 70 10 5
392      where:
393       1) 100 is the size of the send buffer *sizeof(int),
394       2) 1 7 10 12 is the sendcounts array
395       3) 100*sizeof(int) is the size of the receiver buffer
396       4)  1 70 10 5 is the recvcounts array
397   */
398   comm_size = MPI_COMM_WORLD->size();
399   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
400   sendcounts = std::make_shared<std::vector<int>>(comm_size);
401   recvcounts = std::make_shared<std::vector<int>>(comm_size);
402   senddisps  = std::vector<int>(comm_size, 0);
403   recvdisps  = std::vector<int>(comm_size, 0);
404
405   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
406   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
407
408   send_buf_size = parse_integer<int>(action[2]);
409   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
410   for (unsigned int i = 0; i < comm_size; i++) {
411     (*sendcounts)[i] = std::stoi(action[3 + i]);
412     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
413   }
414   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
415   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
416 }
417
418 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
419 {
420   std::string s = boost::algorithm::join(action, " ");
421   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
422   const WaitTestParser& args = get_args();
423   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
424
425   if (request == MPI_REQUEST_NULL) {
426     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
427      * return.*/
428     return;
429   }
430
431   // Must be taken before Request::wait() since the request may be set to
432   // MPI_REQUEST_NULL by Request::wait!
433   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
434
435   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
436
437   MPI_Status status;
438   Request::wait(&request, &status);
439   if(request!=MPI_REQUEST_NULL)
440     Request::unref(&request);
441   TRACE_smpi_comm_out(get_pid());
442   if (is_wait_for_receive)
443     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
444 }
445
446 void SendAction::kernel(simgrid::xbt::ReplayAction&)
447 {
448   const SendRecvParser& args = get_args();
449   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
450
451   TRACE_smpi_comm_in(
452       get_pid(), __func__,
453       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
454   if (not TRACE_smpi_view_internals())
455     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
456
457   if (get_name() == "send") {
458     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459   } else if (get_name() == "isend") {
460     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461     req_storage.add(request);
462   } else {
463     xbt_die("Don't know this action, %s", get_name().c_str());
464   }
465
466   TRACE_smpi_comm_out(get_pid());
467 }
468
469 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
470 {
471   const SendRecvParser& args = get_args();
472   TRACE_smpi_comm_in(
473       get_pid(), __func__,
474       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
475
476   MPI_Status status;
477   // unknown size from the receiver point of view
478   size_t arg_size = args.size;
479   if (arg_size == 0) {
480     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
481     arg_size = status.count;
482   }
483
484   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
485   if (get_name() == "recv") {
486     is_recv = true;
487     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
488   } else if (get_name() == "irecv") {
489     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
490     req_storage.add(request);
491   } else {
492     THROW_IMPOSSIBLE;
493   }
494
495   TRACE_smpi_comm_out(get_pid());
496   if (is_recv && not TRACE_smpi_view_internals()) {
497     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
498     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
499   }
500 }
501
502 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
503 {
504   const ComputeParser& args = get_args();
505   if (smpi_cfg_simulate_computation()) {
506     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
507   }
508 }
509
510 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
511 {
512   const SleepParser& args = get_args();
513   XBT_DEBUG("Sleep for: %lf secs", args.time);
514   aid_t pid = simgrid::s4u::this_actor::get_pid();
515   TRACE_smpi_sleeping_in(pid, args.time);
516   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
517   TRACE_smpi_sleeping_out(pid);
518 }
519
520 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
521 {
522   const LocationParser& args = get_args();
523   smpi_trace_set_call_location(args.filename.c_str(), args.line);
524 }
525
526 void TestAction::kernel(simgrid::xbt::ReplayAction&)
527 {
528   const WaitTestParser& args = get_args();
529   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
530   // if request is null here, this may mean that a previous test has succeeded
531   // Different times in traced application and replayed version may lead to this
532   // In this case, ignore the extra calls.
533   if (request != MPI_REQUEST_NULL) {
534     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
535
536     MPI_Status status;
537     int flag = 0;
538     Request::test(&request, &status, &flag);
539
540     XBT_DEBUG("MPI_Test result: %d", flag);
541     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
542      * nullptr.*/
543     if (request == MPI_REQUEST_NULL)
544       req_storage.addNullRequest(args.src, args.dst, args.tag);
545     else
546       req_storage.add(request);
547
548     TRACE_smpi_comm_out(get_pid());
549   }
550 }
551
552 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
553 {
554   CHECK_ACTION_PARAMS(action, 0, 1)
555     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
556     : MPI_BYTE;  // default TAU datatype
557
558   /* start a simulated timer */
559   smpi_process()->simulated_start();
560 }
561
562 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
563 {
564   /* nothing to do */
565 }
566
567 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
568 {
569   if (req_storage.size() > 0) {
570     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
571     std::vector<MPI_Request> reqs;
572     req_storage.get_requests(reqs);
573     unsigned long count_requests = reqs.size();
574     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
575     for (auto const& req : reqs) {
576       if (req && (req->flags() & MPI_REQ_RECV)) {
577         sender_receiver.emplace_back(req->src(), req->dst());
578       }
579     }
580     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
581     req_storage.get_store().clear();
582
583     for (MPI_Request& req : reqs)
584       if (req != MPI_REQUEST_NULL)
585         Request::unref(&req);
586
587     for (auto const& [src, dst] : sender_receiver) {
588       TRACE_smpi_recv(src, dst, 0);
589     }
590     TRACE_smpi_comm_out(get_pid());
591   }
592 }
593
594 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
595 {
596   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
597   colls::barrier(MPI_COMM_WORLD);
598   TRACE_smpi_comm_out(get_pid());
599 }
600
601 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
602 {
603   const BcastArgParser& args = get_args();
604   TRACE_smpi_comm_in(get_pid(), "action_bcast",
605                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
606                                                     0, Datatype::encode(args.datatype1), ""));
607
608   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
609
610   TRACE_smpi_comm_out(get_pid());
611 }
612
613 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
614 {
615   const ReduceArgParser& args = get_args();
616   TRACE_smpi_comm_in(get_pid(), "action_reduce",
617                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
618                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
619
620   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
621                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
622                 args.root, MPI_COMM_WORLD);
623   if (args.comp_size != 0.0)
624     simgrid::s4u::this_actor::exec_init(args.comp_size)
625       ->set_name("computation")
626       ->start()
627       ->wait();
628
629   TRACE_smpi_comm_out(get_pid());
630 }
631
632 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
633 {
634   const AllReduceArgParser& args = get_args();
635   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
636                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
637                                                     Datatype::encode(args.datatype1), ""));
638
639   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
640                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
641                    MPI_COMM_WORLD);
642   if (args.comp_size != 0.0)
643     simgrid::s4u::this_actor::exec_init(args.comp_size)
644       ->set_name("computation")
645       ->start()
646       ->wait();
647
648   TRACE_smpi_comm_out(get_pid());
649 }
650
651 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
652 {
653   const AllToAllArgParser& args = get_args();
654   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
655                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
656                                                     Datatype::encode(args.datatype1),
657                                                     Datatype::encode(args.datatype2)));
658
659   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
660                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
661                   MPI_COMM_WORLD);
662
663   TRACE_smpi_comm_out(get_pid());
664 }
665
666 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
667 {
668   const GatherArgParser& args = get_args();
669   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
670                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
671                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
672                                                     Datatype::encode(args.datatype2)));
673
674   if (get_name() == "gather") {
675     int rank = MPI_COMM_WORLD->rank();
676     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
677                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
678                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
679   } else
680     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
682                      MPI_COMM_WORLD);
683
684   TRACE_smpi_comm_out(get_pid());
685 }
686
687 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
688 {
689   int rank = MPI_COMM_WORLD->rank();
690   const GatherVArgParser& args = get_args();
691   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
692                      new simgrid::instr::VarCollTIData(
693                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
694                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
695
696   if (get_name() == "gatherv") {
697     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
699                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
700   } else {
701     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
703                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
704   }
705
706   TRACE_smpi_comm_out(get_pid());
707 }
708
709 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
710 {
711   int rank = MPI_COMM_WORLD->rank();
712   const ScatterArgParser& args = get_args();
713   TRACE_smpi_comm_in(get_pid(), "action_scatter",
714                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
715                                                     Datatype::encode(args.datatype1),
716                                                     Datatype::encode(args.datatype2)));
717
718   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
720                  args.datatype2, args.root, MPI_COMM_WORLD);
721
722   TRACE_smpi_comm_out(get_pid());
723 }
724
725 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
726 {
727   int rank = MPI_COMM_WORLD->rank();
728   const ScatterVArgParser& args = get_args();
729   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
730                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
731                                                        nullptr, Datatype::encode(args.datatype1),
732                                                        Datatype::encode(args.datatype2)));
733
734   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
735                   args.sendcounts->data(), args.disps.data(), args.datatype1,
736                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
737                   MPI_COMM_WORLD);
738
739   TRACE_smpi_comm_out(get_pid());
740 }
741
742 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
743 {
744   const ReduceScatterArgParser& args = get_args();
745   TRACE_smpi_comm_in(
746       get_pid(), "action_reducescatter",
747       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
748                                         /* ugly as we use datatype field to pass computation as string */
749                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
750                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
751                                         Datatype::encode(args.datatype1)));
752
753   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
754                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
755                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
756   if (args.comp_size != 0.0)
757     simgrid::s4u::this_actor::exec_init(args.comp_size)
758       ->set_name("computation")
759       ->start()
760       ->wait();
761   TRACE_smpi_comm_out(get_pid());
762 }
763
764 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
765 {
766   const ScanArgParser& args = get_args();
767   TRACE_smpi_comm_in(get_pid(), "action_scan",
768                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
769                      args.size, 0, Datatype::encode(args.datatype1), ""));
770   if (get_name() == "scan")
771     colls::scan(send_buffer(args.size * args.datatype1->size()),
772               recv_buffer(args.size * args.datatype1->size()), args.size,
773               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
774   else
775     colls::exscan(send_buffer(args.size * args.datatype1->size()),
776               recv_buffer(args.size * args.datatype1->size()), args.size,
777               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
778
779   if (args.comp_size != 0.0)
780     simgrid::s4u::this_actor::exec_init(args.comp_size)
781       ->set_name("computation")
782       ->start()
783       ->wait();
784   TRACE_smpi_comm_out(get_pid());
785 }
786
787 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
788 {
789   const AllToAllVArgParser& args = get_args();
790   TRACE_smpi_comm_in(get_pid(), __func__,
791                      new simgrid::instr::VarCollTIData(
792                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
794
795   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
796                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
797                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
798
799   TRACE_smpi_comm_out(get_pid());
800 }
801 } // namespace simgrid::smpi::replay
802
803 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
806 {
807   xbt_assert(not smpi_process()->initializing());
808
809   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
810   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
811   simgrid::smpi::ActorExt::init();
812
813   smpi_process()->mark_as_initialized();
814   smpi_process()->set_replaying(true);
815
816   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
817   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
818   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
819   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
828   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
829   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
838   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
840   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
841   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
843   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
844   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
845   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
846   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
847
848   //if we have a delayed start, sleep here.
849   if (start_delay_flops > 0) {
850     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
851     private_execute_flops(start_delay_flops);
852   } else {
853     // Wait for the other actors to initialize also
854     simgrid::s4u::this_actor::yield();
855   }
856   if(_smpi_init_sleep > 0)
857     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
858 }
859
860 /** @brief actually run the replay after initialization */
861 void smpi_replay_main(int rank, const char* private_trace_filename)
862 {
863   static int active_processes = 0;
864   active_processes++;
865   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
866   std::string rank_string                      = std::to_string(rank);
867   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
868
869   /* and now, finalize everything */
870   /* One active process will stop. Decrease the counter*/
871   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
872   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
873   if (count_requests > 0) {
874     std::vector<MPI_Request> requests(count_requests);
875     unsigned int i=0;
876
877     for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
878       for (auto& req : reqs) {
879         requests[i] = req; // FIXME: overwritten at each iteration?
880       }
881       i++;
882     }
883     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
884   }
885
886   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
887     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
888
889   active_processes--;
890
891   if(active_processes==0){
892     /* Last process alive speaking: end the simulated timer */
893     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
894     smpi_free_replay_tmp_buffers();
895   }
896
897   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
898                      new simgrid::instr::NoOpTIData("finalize"));
899
900   smpi_process()->finalize();
901
902   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
903 }
904
905 /** @brief chain a replay initialization and a replay start */
906 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
907 {
908   smpi_replay_init(instance_id, rank, start_delay_flops);
909   smpi_replay_main(rank, private_trace_filename);
910 }