Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
multiple fixes for replay
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <cmath>
16 #include <limits>
17 #include <memory>
18 #include <numeric>
19 #include <tuple>
20 #include <unordered_map>
21 #include <vector>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
30 public:
31   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 };
33
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 {
36   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 }
38
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 public:
42   static void apply(size_t& seed, Tuple const& tuple)
43   {
44     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45     hash_combine(seed, std::get<Index>(tuple));
46   }
47 };
48
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 public:
51   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 };
53
54 template <typename... TT> class hash<std::tuple<TT...>> {
55 public:
56   size_t operator()(std::tuple<TT...> const& tt) const
57   {
58     size_t seed = 0;
59     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
60     return seed;
61   }
62 };
63 }
64
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
66 {
67   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68     std::string s = boost::algorithm::join(action, " ");
69     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
70   }
71 }
72
73 /* Helper functions */
74 static double parse_double(const std::string& string)
75 {
76   return xbt_str_parse_double(string.c_str(), "not a double");
77 }
78
79 template <typename T> static T parse_integer(const std::string& string)
80 {
81   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83                  val <= static_cast<double>(std::numeric_limits<T>::max()),
84              "out of range: %g", val);
85   return static_cast<T>(val);
86 }
87
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
89 {
90   return i < action.size() ? std::stoi(action[i]) : 0;
91 }
92
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
94 {
95   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
96 }
97
98 namespace simgrid {
99 namespace smpi {
100
101 namespace replay {
102 MPI_Datatype MPI_DEFAULT_TYPE;
103
104 class RequestStorage {
105 private:
106   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
108
109   req_storage_t store;
110
111 public:
112   RequestStorage() = default;
113   size_t size() const { return store.size(); }
114
115   req_storage_t& get_store() { return store; }
116
117   void get_requests(std::vector<MPI_Request>& vec) const
118   {
119     for (auto const& pair : store) {
120       auto& req       = pair.second;
121       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123         vec.push_back(pair.second);
124         pair.second->print_request("MM");
125       }
126     }
127   }
128
129     MPI_Request find(int src, int dst, int tag)
130     {
131       auto it = store.find(req_key_t(src, dst, tag));
132       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
133     }
134
135     void remove(const Request* req)
136     {
137       if (req == MPI_REQUEST_NULL) return;
138
139       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
140     }
141
142     void add(MPI_Request req)
143     {
144       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
146     }
147
148     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149     void addNullRequest(int src, int dst, int tag)
150     {
151       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
152                     MPI_REQUEST_NULL});
153     }
154 };
155
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 {
158   CHECK_ACTION_PARAMS(action, 3, 0)
159   src = std::stoi(action[2]);
160   dst = std::stoi(action[3]);
161   tag = std::stoi(action[4]);
162 }
163
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
165 {
166   CHECK_ACTION_PARAMS(action, 3, 1)
167   partner = std::stoi(action[2]);
168   tag     = std::stoi(action[3]);
169   size      = parse_integer<size_t>(action[4]);
170   datatype1 = parse_datatype(action, 5);
171 }
172
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
174 {
175   CHECK_ACTION_PARAMS(action, 1, 0)
176   flops = parse_double(action[2]);
177 }
178
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 {
181   CHECK_ACTION_PARAMS(action, 1, 0)
182   time = parse_double(action[2]);
183 }
184
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 {
187   CHECK_ACTION_PARAMS(action, 2, 0)
188   filename = std::string(action[2]);
189   line = std::stoi(action[3]);
190 }
191
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 {
194   CHECK_ACTION_PARAMS(action, 1, 2)
195   size      = parse_integer<size_t>(action[2]);
196   root      = parse_root(action, 3);
197   datatype1 = parse_datatype(action, 4);
198 }
199
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 2)
203   comm_size = parse_integer<unsigned>(action[2]);
204   comp_size = parse_double(action[3]);
205   root      = parse_root(action, 4);
206   datatype1 = parse_datatype(action, 5);
207 }
208
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = parse_integer<unsigned>(action[2]);
213   comp_size = parse_double(action[3]);
214   datatype1 = parse_datatype(action, 4);
215 }
216
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 1)
220   comm_size = MPI_COMM_WORLD->size();
221   send_size = parse_integer<int>(action[2]);
222   recv_size = parse_integer<int>(action[3]);
223   datatype1 = parse_datatype(action, 4);
224   datatype2 = parse_datatype(action, 5);
225 }
226
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
228 {
229   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
230         0 gather 68 68 0 0 0
231       where:
232         1) 68 is the sendcounts
233         2) 68 is the recvcounts
234         3) 0 is the root node
235         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
237   */
238   CHECK_ACTION_PARAMS(action, 2, 3)
239   comm_size = MPI_COMM_WORLD->size();
240   send_size = parse_integer<int>(action[2]);
241   recv_size = parse_integer<int>(action[3]);
242
243   if (name == "gather") {
244     root      = parse_root(action, 4);
245     datatype1 = parse_datatype(action, 5);
246     datatype2 = parse_datatype(action, 6);
247   } else {
248     root      = 0;
249     datatype1 = parse_datatype(action, 4);
250     datatype2 = parse_datatype(action, 5);
251   }
252 }
253
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
255 {
256   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257        0 gather 68 68 10 10 10 0 0 0
258      where:
259        1) 68 is the sendcount
260        2) 68 10 10 10 is the recvcounts
261        3) 0 is the root node
262        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
264   */
265   comm_size = MPI_COMM_WORLD->size();
266   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267   send_size  = parse_integer<int>(action[2]);
268   disps      = std::vector<int>(comm_size, 0);
269   recvcounts = std::make_shared<std::vector<int>>(comm_size);
270
271   if (name == "gatherv") {
272     root      = parse_root(action, 3 + comm_size);
273     datatype1 = parse_datatype(action, 4 + comm_size);
274     datatype2 = parse_datatype(action, 5 + comm_size);
275   } else {
276     root                = 0;
277     unsigned disp_index = 0;
278     /* The 3 comes from "0 gather <sendcount>", which must always be present.
279      * The + comm_size is the recvcounts array, which must also be present
280      */
281     if (action.size() > 3 + comm_size + comm_size) {
282       // datatype + disp are specified
283       datatype1  = parse_datatype(action, 3 + comm_size);
284       datatype2  = parse_datatype(action, 4 + comm_size);
285       disp_index = 5 + comm_size;
286     } else if (action.size() > 3 + comm_size + 2) {
287       // disps specified; datatype is not specified; use the default one
288       datatype1  = MPI_DEFAULT_TYPE;
289       datatype2  = MPI_DEFAULT_TYPE;
290       disp_index = 3 + comm_size;
291     } else {
292       // no disp specified, maybe only datatype,
293       datatype1 = parse_datatype(action, 3 + comm_size);
294       datatype2 = parse_datatype(action, 4 + comm_size);
295     }
296
297     if (disp_index != 0) {
298       xbt_assert(disp_index + comm_size <= action.size());
299       for (unsigned i = 0; i < comm_size; i++)
300         disps[i]          = std::stoi(action[disp_index + i]);
301     }
302   }
303
304   for (unsigned int i = 0; i < comm_size; i++) {
305     (*recvcounts)[i] = std::stoi(action[i + 3]);
306   }
307   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
308 }
309
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
311 {
312   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
313         0 gather 68 68 0 0 0
314       where:
315         1) 68 is the sendcounts
316         2) 68 is the recvcounts
317         3) 0 is the root node
318         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
320   */
321   comm_size = MPI_COMM_WORLD->size();
322   CHECK_ACTION_PARAMS(action, 2, 3)
323   comm_size = MPI_COMM_WORLD->size();
324   send_size = parse_integer<int>(action[2]);
325   recv_size = parse_integer<int>(action[3]);
326   root      = parse_root(action, 4);
327   datatype1 = parse_datatype(action, 5);
328   datatype2 = parse_datatype(action, 6);
329 }
330
331 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
332 {
333   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
334      0 gather 68 10 10 10 68 0 0 0
335       where:
336       1) 68 10 10 10 is the sendcounts
337       2) 68 is the recvcount
338       3) 0 is the root node
339       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
340       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
341   */
342   comm_size = MPI_COMM_WORLD->size();
343   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
344   recv_size  = parse_integer<int>(action[2 + comm_size]);
345   disps      = std::vector<int>(comm_size, 0);
346   sendcounts = std::make_shared<std::vector<int>>(comm_size);
347
348   root      = parse_root(action, 3 + comm_size);
349   datatype1 = parse_datatype(action, 4 + comm_size);
350   datatype2 = parse_datatype(action, 5 + comm_size);
351
352   for (unsigned int i = 0; i < comm_size; i++) {
353     (*sendcounts)[i] = std::stoi(action[i + 2]);
354   }
355   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
356 }
357
358 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
359 {
360   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
361        0 reducescatter 275427 275427 275427 204020 11346849 0
362      where:
363        1) The first four values after the name of the action declare the recvcounts array
364        2) The value 11346849 is the amount of instructions
365        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
366   */
367   comm_size = MPI_COMM_WORLD->size();
368   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
369   comp_size  = parse_double(action[2 + comm_size]);
370   recvcounts = std::make_shared<std::vector<int>>(comm_size);
371   datatype1  = parse_datatype(action, 3 + comm_size);
372
373   for (unsigned int i = 0; i < comm_size; i++) {
374     (*recvcounts)[i]= std::stoi(action[i + 2]);
375   }
376   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 }
378
379 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 {
381   CHECK_ACTION_PARAMS(action, 1, 1)
382   size      = parse_integer<size_t>(action[2]);
383   datatype1 = parse_datatype(action, 3);
384 }
385
386 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
387 {
388   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
389         0 alltoallv 100 1 7 10 12 100 1 70 10 5
390      where:
391       1) 100 is the size of the send buffer *sizeof(int),
392       2) 1 7 10 12 is the sendcounts array
393       3) 100*sizeof(int) is the size of the receiver buffer
394       4)  1 70 10 5 is the recvcounts array
395   */
396   comm_size = MPI_COMM_WORLD->size();
397   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
398   sendcounts = std::make_shared<std::vector<int>>(comm_size);
399   recvcounts = std::make_shared<std::vector<int>>(comm_size);
400   senddisps  = std::vector<int>(comm_size, 0);
401   recvdisps  = std::vector<int>(comm_size, 0);
402
403   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
404   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
405
406   send_buf_size = parse_integer<int>(action[2]);
407   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
408   for (unsigned int i = 0; i < comm_size; i++) {
409     (*sendcounts)[i] = std::stoi(action[3 + i]);
410     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
411   }
412   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
413   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
414 }
415
416 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
417 {
418   std::string s = boost::algorithm::join(action, " ");
419   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
420   const WaitTestParser& args = get_args();
421   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
422   req_storage.remove(request);
423
424   if (request == MPI_REQUEST_NULL) {
425     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
426      * return.*/
427     return;
428   }
429
430   // Must be taken before Request::wait() since the request may be set to
431   // MPI_REQUEST_NULL by Request::wait!
432   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
433
434   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
435
436   MPI_Status status;
437   Request::wait(&request, &status);
438
439   TRACE_smpi_comm_out(get_pid());
440   if (is_wait_for_receive)
441     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
442 }
443
444 void SendAction::kernel(simgrid::xbt::ReplayAction&)
445 {
446   const SendRecvParser& args = get_args();
447   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
448
449   TRACE_smpi_comm_in(
450       get_pid(), __func__,
451       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
452   if (not TRACE_smpi_view_internals())
453     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
454
455   if (get_name() == "send") {
456     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
457   } else if (get_name() == "isend") {
458     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
459     req_storage.add(request);
460   } else {
461     xbt_die("Don't know this action, %s", get_name().c_str());
462   }
463
464   TRACE_smpi_comm_out(get_pid());
465 }
466
467 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
468 {
469   const SendRecvParser& args = get_args();
470   TRACE_smpi_comm_in(
471       get_pid(), __func__,
472       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
473
474   MPI_Status status;
475   // unknown size from the receiver point of view
476   size_t arg_size = args.size;
477   if (arg_size == 0) {
478     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
479     arg_size = status.count;
480   }
481
482   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
483   if (get_name() == "recv") {
484     is_recv = true;
485     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
486   } else if (get_name() == "irecv") {
487     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
488     req_storage.add(request);
489   } else {
490     THROW_IMPOSSIBLE;
491   }
492
493   TRACE_smpi_comm_out(get_pid());
494   if (is_recv && not TRACE_smpi_view_internals()) {
495     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
496     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
497   }
498 }
499
500 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
501 {
502   const ComputeParser& args = get_args();
503   if (smpi_cfg_simulate_computation()) {
504     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
505   }
506 }
507
508 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
509 {
510   const SleepParser& args = get_args();
511   XBT_DEBUG("Sleep for: %lf secs", args.time);
512   aid_t pid = simgrid::s4u::this_actor::get_pid();
513   TRACE_smpi_sleeping_in(pid, args.time);
514   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
515   TRACE_smpi_sleeping_out(pid);
516 }
517
518 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
519 {
520   const LocationParser& args = get_args();
521   smpi_trace_set_call_location(args.filename.c_str(), args.line);
522 }
523
524 void TestAction::kernel(simgrid::xbt::ReplayAction&)
525 {
526   const WaitTestParser& args = get_args();
527   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
528   req_storage.remove(request);
529   // if request is null here, this may mean that a previous test has succeeded
530   // Different times in traced application and replayed version may lead to this
531   // In this case, ignore the extra calls.
532   if (request != MPI_REQUEST_NULL) {
533     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
534
535     MPI_Status status;
536     int flag = 0;
537     Request::test(&request, &status, &flag);
538
539     XBT_DEBUG("MPI_Test result: %d", flag);
540     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
541      * nullptr.*/
542     if (request == MPI_REQUEST_NULL)
543       req_storage.addNullRequest(args.src, args.dst, args.tag);
544     else
545       req_storage.add(request);
546
547     TRACE_smpi_comm_out(get_pid());
548   }
549 }
550
551 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
552 {
553   CHECK_ACTION_PARAMS(action, 0, 1)
554     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
555     : MPI_BYTE;  // default TAU datatype
556
557   /* start a simulated timer */
558   smpi_process()->simulated_start();
559 }
560
561 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
562 {
563   /* nothing to do */
564 }
565
566 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
567 {
568   const size_t count_requests = req_storage.size();
569
570   if (count_requests > 0) {
571     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
572     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
573     std::vector<MPI_Request> reqs;
574     req_storage.get_requests(reqs);
575     for (auto const& req : reqs) {
576       if (req && (req->flags() & MPI_REQ_RECV)) {
577         sender_receiver.emplace_back(req->src(), req->dst());
578       }
579     }
580     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
581     req_storage.get_store().clear();
582
583     for (auto const& pair : sender_receiver) {
584       TRACE_smpi_recv(pair.first, pair.second, 0);
585     }
586     TRACE_smpi_comm_out(get_pid());
587   }
588 }
589
590 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
591 {
592   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
593   colls::barrier(MPI_COMM_WORLD);
594   TRACE_smpi_comm_out(get_pid());
595 }
596
597 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
598 {
599   const BcastArgParser& args = get_args();
600   TRACE_smpi_comm_in(get_pid(), "action_bcast",
601                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
602                                                     0, Datatype::encode(args.datatype1), ""));
603
604   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
605
606   TRACE_smpi_comm_out(get_pid());
607 }
608
609 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
610 {
611   const ReduceArgParser& args = get_args();
612   TRACE_smpi_comm_in(get_pid(), "action_reduce",
613                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
614                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
615
616   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
617                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
618                 args.root, MPI_COMM_WORLD);
619   if(args.comp_size != 0.0)
620     private_execute_flops(args.comp_size);
621
622   TRACE_smpi_comm_out(get_pid());
623 }
624
625 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
626 {
627   const AllReduceArgParser& args = get_args();
628   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
629                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
630                                                     Datatype::encode(args.datatype1), ""));
631
632   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
633                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
634                    MPI_COMM_WORLD);
635   if(args.comp_size != 0.0)
636     private_execute_flops(args.comp_size);
637
638   TRACE_smpi_comm_out(get_pid());
639 }
640
641 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
642 {
643   const AllToAllArgParser& args = get_args();
644   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
645                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
646                                                     Datatype::encode(args.datatype1),
647                                                     Datatype::encode(args.datatype2)));
648
649   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
650                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
651                   MPI_COMM_WORLD);
652
653   TRACE_smpi_comm_out(get_pid());
654 }
655
656 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
657 {
658   const GatherArgParser& args = get_args();
659   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
660                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
661                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
662                                                     Datatype::encode(args.datatype2)));
663
664   if (get_name() == "gather") {
665     int rank = MPI_COMM_WORLD->rank();
666     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
667                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
668                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
669   } else
670     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
671                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
672                      MPI_COMM_WORLD);
673
674   TRACE_smpi_comm_out(get_pid());
675 }
676
677 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
678 {
679   int rank = MPI_COMM_WORLD->rank();
680   const GatherVArgParser& args = get_args();
681   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
682                      new simgrid::instr::VarCollTIData(
683                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
684                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
685
686   if (get_name() == "gatherv") {
687     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
688                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
689                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
690   } else {
691     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
692                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
693                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
694   }
695
696   TRACE_smpi_comm_out(get_pid());
697 }
698
699 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
700 {
701   int rank = MPI_COMM_WORLD->rank();
702   const ScatterArgParser& args = get_args();
703   TRACE_smpi_comm_in(get_pid(), "action_scatter",
704                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
705                                                     Datatype::encode(args.datatype1),
706                                                     Datatype::encode(args.datatype2)));
707
708   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
709                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
710                  args.datatype2, args.root, MPI_COMM_WORLD);
711
712   TRACE_smpi_comm_out(get_pid());
713 }
714
715 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
716 {
717   int rank = MPI_COMM_WORLD->rank();
718   const ScatterVArgParser& args = get_args();
719   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
720                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
721                                                        nullptr, Datatype::encode(args.datatype1),
722                                                        Datatype::encode(args.datatype2)));
723
724   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
725                   args.sendcounts->data(), args.disps.data(), args.datatype1,
726                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
727                   MPI_COMM_WORLD);
728
729   TRACE_smpi_comm_out(get_pid());
730 }
731
732 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
733 {
734   const ReduceScatterArgParser& args = get_args();
735   TRACE_smpi_comm_in(
736       get_pid(), "action_reducescatter",
737       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
738                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
739                                         Datatype::encode(args.datatype1)));
740
741   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
742                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
743                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
744
745   private_execute_flops(args.comp_size);
746   TRACE_smpi_comm_out(get_pid());
747 }
748
749 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
750 {
751   const AllToAllVArgParser& args = get_args();
752   TRACE_smpi_comm_in(get_pid(), __func__,
753                      new simgrid::instr::VarCollTIData(
754                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
755                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
756
757   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
758                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
759                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
760
761   TRACE_smpi_comm_out(get_pid());
762 }
763 } // Replay Namespace
764 }} // namespace simgrid::smpi
765
766 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
767 /** @brief Only initialize the replay, don't do it for real */
768 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
769 {
770   xbt_assert(not smpi_process()->initializing());
771
772   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
773   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
774   simgrid::smpi::ActorExt::init();
775
776   smpi_process()->mark_as_initialized();
777   smpi_process()->set_replaying(true);
778
779   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
780   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
781   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
782   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
783   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
784   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
785   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
786   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
787   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
788   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
789   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
790   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
791   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
792   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
793   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
794   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
795   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
796   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
797   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
798   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
799   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
800   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
801   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
802   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
803   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
804   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
805   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
806   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
807   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
808
809   //if we have a delayed start, sleep here.
810   if (start_delay_flops > 0) {
811     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
812     private_execute_flops(start_delay_flops);
813   } else {
814     // Wait for the other actors to initialize also
815     simgrid::s4u::this_actor::yield();
816   }
817 }
818
819 /** @brief actually run the replay after initialization */
820 void smpi_replay_main(int rank, const char* private_trace_filename)
821 {
822   static int active_processes = 0;
823   active_processes++;
824   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
825   std::string rank_string                      = std::to_string(rank);
826   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
827
828   /* and now, finalize everything */
829   /* One active process will stop. Decrease the counter*/
830   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
831   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
832   if (count_requests > 0) {
833     std::vector<MPI_Request> requests(count_requests);
834     unsigned int i=0;
835
836     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
837       requests[i] = pair.second;
838       i++;
839     }
840     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
841   }
842
843   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
844     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
845
846   active_processes--;
847
848   if(active_processes==0){
849     /* Last process alive speaking: end the simulated timer */
850     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
851     smpi_free_replay_tmp_buffers();
852   }
853
854   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
855                      new simgrid::instr::NoOpTIData("finalize"));
856
857   smpi_process()->finalize();
858
859   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
860 }
861
862 /** @brief chain a replay initialization and a replay start */
863 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
864 {
865   smpi_replay_init(instance_id, rank, start_delay_flops);
866   smpi_replay_main(rank, private_trace_filename);
867 }