Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
allow smpi/init option in replay as well.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "smpi_config.hpp"
12 #include "simgrid/s4u/Exec.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/smpi_replay.hpp>
15 #include <src/smpi/include/private.hpp>
16
17 #include <cmath>
18 #include <limits>
19 #include <memory>
20 #include <numeric>
21 #include <tuple>
22 #include <unordered_map>
23 #include <vector>
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
26
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid {
101 namespace smpi {
102
103 namespace replay {
104 MPI_Datatype MPI_DEFAULT_TYPE;
105
106 class RequestStorage {
107 private:
108   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
110
111   req_storage_t store;
112
113 public:
114   RequestStorage() = default;
115   size_t size() const { return store.size(); }
116
117   req_storage_t& get_store() { return store; }
118
119   void get_requests(std::vector<MPI_Request>& vec) const
120   {
121     for (auto const& pair : store) {
122       auto& req       = pair.second;
123       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
125         vec.push_back(pair.second);
126         pair.second->print_request("MM");
127       }
128     }
129   }
130
131     MPI_Request find(int src, int dst, int tag)
132     {
133       auto it = store.find(req_key_t(src, dst, tag));
134       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
135     }
136
137     void remove(const Request* req)
138     {
139       if (req == MPI_REQUEST_NULL) return;
140
141       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
142     }
143
144     void add(MPI_Request req)
145     {
146       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
147         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
148     }
149
150     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
151     void addNullRequest(int src, int dst, int tag)
152     {
153       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
154                     MPI_REQUEST_NULL});
155     }
156 };
157
158 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
159 {
160   CHECK_ACTION_PARAMS(action, 3, 0)
161   src = std::stoi(action[2]);
162   dst = std::stoi(action[3]);
163   tag = std::stoi(action[4]);
164 }
165
166 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 {
168   CHECK_ACTION_PARAMS(action, 3, 1)
169   partner = std::stoi(action[2]);
170   tag     = std::stoi(action[3]);
171   size      = parse_integer<size_t>(action[4]);
172   datatype1 = parse_datatype(action, 5);
173 }
174
175 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 {
177   CHECK_ACTION_PARAMS(action, 1, 0)
178   flops = parse_double(action[2]);
179 }
180
181 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 0)
184   time = parse_double(action[2]);
185 }
186
187 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 0)
190   filename = std::string(action[2]);
191   line = std::stoi(action[3]);
192 }
193
194 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 {
196   CHECK_ACTION_PARAMS(action, 1, 2)
197   size      = parse_integer<size_t>(action[2]);
198   root      = parse_root(action, 3);
199   datatype1 = parse_datatype(action, 4);
200 }
201
202 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 {
204   CHECK_ACTION_PARAMS(action, 2, 2)
205   comm_size = parse_integer<unsigned>(action[2]);
206   comp_size = parse_double(action[3]);
207   root      = parse_root(action, 4);
208   datatype1 = parse_datatype(action, 5);
209 }
210
211 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1)
214   comm_size = parse_integer<unsigned>(action[2]);
215   comp_size = parse_double(action[3]);
216   datatype1 = parse_datatype(action, 4);
217 }
218
219 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
220 {
221   CHECK_ACTION_PARAMS(action, 2, 1)
222   comm_size = MPI_COMM_WORLD->size();
223   send_size = parse_integer<int>(action[2]);
224   recv_size = parse_integer<int>(action[3]);
225   datatype1 = parse_datatype(action, 4);
226   datatype2 = parse_datatype(action, 5);
227 }
228
229 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
230 {
231   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
232         0 gather 68 68 0 0 0
233       where:
234         1) 68 is the sendcounts
235         2) 68 is the recvcounts
236         3) 0 is the root node
237         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
238         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
239   */
240   CHECK_ACTION_PARAMS(action, 2, 3)
241   comm_size = MPI_COMM_WORLD->size();
242   send_size = parse_integer<int>(action[2]);
243   recv_size = parse_integer<int>(action[3]);
244
245   if (name == "gather") {
246     root      = parse_root(action, 4);
247     datatype1 = parse_datatype(action, 5);
248     datatype2 = parse_datatype(action, 6);
249   } else {
250     root      = 0;
251     datatype1 = parse_datatype(action, 4);
252     datatype2 = parse_datatype(action, 5);
253   }
254 }
255
256 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
257 {
258   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
259        0 gather 68 68 10 10 10 0 0 0
260      where:
261        1) 68 is the sendcount
262        2) 68 10 10 10 is the recvcounts
263        3) 0 is the root node
264        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
265        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
266   */
267   comm_size = MPI_COMM_WORLD->size();
268   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
269   send_size  = parse_integer<int>(action[2]);
270   disps      = std::vector<int>(comm_size, 0);
271   recvcounts = std::make_shared<std::vector<int>>(comm_size);
272
273   if (name == "gatherv") {
274     root      = parse_root(action, 3 + comm_size);
275     datatype1 = parse_datatype(action, 4 + comm_size);
276     datatype2 = parse_datatype(action, 5 + comm_size);
277   } else {
278     root                = 0;
279     unsigned disp_index = 0;
280     /* The 3 comes from "0 gather <sendcount>", which must always be present.
281      * The + comm_size is the recvcounts array, which must also be present
282      */
283     if (action.size() > 3 + comm_size + comm_size) {
284       // datatype + disp are specified
285       datatype1  = parse_datatype(action, 3 + comm_size);
286       datatype2  = parse_datatype(action, 4 + comm_size);
287       disp_index = 5 + comm_size;
288     } else if (action.size() > 3 + comm_size + 2) {
289       // disps specified; datatype is not specified; use the default one
290       datatype1  = MPI_DEFAULT_TYPE;
291       datatype2  = MPI_DEFAULT_TYPE;
292       disp_index = 3 + comm_size;
293     } else {
294       // no disp specified, maybe only datatype,
295       datatype1 = parse_datatype(action, 3 + comm_size);
296       datatype2 = parse_datatype(action, 4 + comm_size);
297     }
298
299     if (disp_index != 0) {
300       xbt_assert(disp_index + comm_size <= action.size());
301       for (unsigned i = 0; i < comm_size; i++)
302         disps[i]          = std::stoi(action[disp_index + i]);
303     }
304   }
305
306   for (unsigned int i = 0; i < comm_size; i++) {
307     (*recvcounts)[i] = std::stoi(action[i + 3]);
308   }
309   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
310 }
311
312 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
313 {
314   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
315         0 gather 68 68 0 0 0
316       where:
317         1) 68 is the sendcounts
318         2) 68 is the recvcounts
319         3) 0 is the root node
320         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
321         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
322   */
323   comm_size = MPI_COMM_WORLD->size();
324   CHECK_ACTION_PARAMS(action, 2, 3)
325   comm_size = MPI_COMM_WORLD->size();
326   send_size = parse_integer<int>(action[2]);
327   recv_size = parse_integer<int>(action[3]);
328   root      = parse_root(action, 4);
329   datatype1 = parse_datatype(action, 5);
330   datatype2 = parse_datatype(action, 6);
331 }
332
333 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
334 {
335   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
336      0 gather 68 10 10 10 68 0 0 0
337       where:
338       1) 68 10 10 10 is the sendcounts
339       2) 68 is the recvcount
340       3) 0 is the root node
341       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
342       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
343   */
344   comm_size = MPI_COMM_WORLD->size();
345   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
346   recv_size  = parse_integer<int>(action[2 + comm_size]);
347   disps      = std::vector<int>(comm_size, 0);
348   sendcounts = std::make_shared<std::vector<int>>(comm_size);
349
350   root      = parse_root(action, 3 + comm_size);
351   datatype1 = parse_datatype(action, 4 + comm_size);
352   datatype2 = parse_datatype(action, 5 + comm_size);
353
354   for (unsigned int i = 0; i < comm_size; i++) {
355     (*sendcounts)[i] = std::stoi(action[i + 2]);
356   }
357   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
358 }
359
360 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
361 {
362   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
363        0 reducescatter 275427 275427 275427 204020 11346849 0
364      where:
365        1) The first four values after the name of the action declare the recvcounts array
366        2) The value 11346849 is the amount of instructions
367        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
368   */
369   comm_size = MPI_COMM_WORLD->size();
370   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
371   comp_size  = parse_double(action[2 + comm_size]);
372   recvcounts = std::make_shared<std::vector<int>>(comm_size);
373   datatype1  = parse_datatype(action, 3 + comm_size);
374
375   for (unsigned int i = 0; i < comm_size; i++) {
376     (*recvcounts)[i]= std::stoi(action[i + 2]);
377   }
378   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
379 }
380
381 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
382 {
383   CHECK_ACTION_PARAMS(action, 2, 1)
384   size      = parse_integer<size_t>(action[2]);
385   comp_size = parse_double(action[3]);
386   datatype1 = parse_datatype(action, 4);
387 }
388
389 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
390 {
391   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
392         0 alltoallv 100 1 7 10 12 100 1 70 10 5
393      where:
394       1) 100 is the size of the send buffer *sizeof(int),
395       2) 1 7 10 12 is the sendcounts array
396       3) 100*sizeof(int) is the size of the receiver buffer
397       4)  1 70 10 5 is the recvcounts array
398   */
399   comm_size = MPI_COMM_WORLD->size();
400   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
401   sendcounts = std::make_shared<std::vector<int>>(comm_size);
402   recvcounts = std::make_shared<std::vector<int>>(comm_size);
403   senddisps  = std::vector<int>(comm_size, 0);
404   recvdisps  = std::vector<int>(comm_size, 0);
405
406   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
407   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
408
409   send_buf_size = parse_integer<int>(action[2]);
410   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
411   for (unsigned int i = 0; i < comm_size; i++) {
412     (*sendcounts)[i] = std::stoi(action[3 + i]);
413     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
414   }
415   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
416   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
417 }
418
419 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
420 {
421   std::string s = boost::algorithm::join(action, " ");
422   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
423   const WaitTestParser& args = get_args();
424   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
425   req_storage.remove(request);
426
427   if (request == MPI_REQUEST_NULL) {
428     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
429      * return.*/
430     return;
431   }
432
433   // Must be taken before Request::wait() since the request may be set to
434   // MPI_REQUEST_NULL by Request::wait!
435   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
436
437   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
438
439   MPI_Status status;
440   Request::wait(&request, &status);
441
442   TRACE_smpi_comm_out(get_pid());
443   if (is_wait_for_receive)
444     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
445 }
446
447 void SendAction::kernel(simgrid::xbt::ReplayAction&)
448 {
449   const SendRecvParser& args = get_args();
450   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
451
452   TRACE_smpi_comm_in(
453       get_pid(), __func__,
454       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
455   if (not TRACE_smpi_view_internals())
456     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
457
458   if (get_name() == "send") {
459     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
460   } else if (get_name() == "isend") {
461     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
462     req_storage.add(request);
463   } else {
464     xbt_die("Don't know this action, %s", get_name().c_str());
465   }
466
467   TRACE_smpi_comm_out(get_pid());
468 }
469
470 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
471 {
472   const SendRecvParser& args = get_args();
473   TRACE_smpi_comm_in(
474       get_pid(), __func__,
475       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
476
477   MPI_Status status;
478   // unknown size from the receiver point of view
479   size_t arg_size = args.size;
480   if (arg_size == 0) {
481     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
482     arg_size = status.count;
483   }
484
485   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
486   if (get_name() == "recv") {
487     is_recv = true;
488     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
489   } else if (get_name() == "irecv") {
490     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
491     req_storage.add(request);
492   } else {
493     THROW_IMPOSSIBLE;
494   }
495
496   TRACE_smpi_comm_out(get_pid());
497   if (is_recv && not TRACE_smpi_view_internals()) {
498     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
499     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
500   }
501 }
502
503 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
504 {
505   const ComputeParser& args = get_args();
506   if (smpi_cfg_simulate_computation()) {
507     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
508   }
509 }
510
511 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   const SleepParser& args = get_args();
514   XBT_DEBUG("Sleep for: %lf secs", args.time);
515   aid_t pid = simgrid::s4u::this_actor::get_pid();
516   TRACE_smpi_sleeping_in(pid, args.time);
517   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
518   TRACE_smpi_sleeping_out(pid);
519 }
520
521 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
522 {
523   const LocationParser& args = get_args();
524   smpi_trace_set_call_location(args.filename.c_str(), args.line);
525 }
526
527 void TestAction::kernel(simgrid::xbt::ReplayAction&)
528 {
529   const WaitTestParser& args = get_args();
530   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
531   req_storage.remove(request);
532   // if request is null here, this may mean that a previous test has succeeded
533   // Different times in traced application and replayed version may lead to this
534   // In this case, ignore the extra calls.
535   if (request != MPI_REQUEST_NULL) {
536     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
537
538     MPI_Status status;
539     int flag = 0;
540     Request::test(&request, &status, &flag);
541
542     XBT_DEBUG("MPI_Test result: %d", flag);
543     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
544      * nullptr.*/
545     if (request == MPI_REQUEST_NULL)
546       req_storage.addNullRequest(args.src, args.dst, args.tag);
547     else
548       req_storage.add(request);
549
550     TRACE_smpi_comm_out(get_pid());
551   }
552 }
553
554 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
555 {
556   CHECK_ACTION_PARAMS(action, 0, 1)
557     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
558     : MPI_BYTE;  // default TAU datatype
559
560   /* start a simulated timer */
561   smpi_process()->simulated_start();
562 }
563
564 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
565 {
566   /* nothing to do */
567 }
568
569 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
570 {
571   const size_t count_requests = req_storage.size();
572
573   if (count_requests > 0) {
574     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
575     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
576     std::vector<MPI_Request> reqs;
577     req_storage.get_requests(reqs);
578     for (auto const& req : reqs) {
579       if (req && (req->flags() & MPI_REQ_RECV)) {
580         sender_receiver.emplace_back(req->src(), req->dst());
581       }
582     }
583     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
584     req_storage.get_store().clear();
585
586     for (auto const& pair : sender_receiver) {
587       TRACE_smpi_recv(pair.first, pair.second, 0);
588     }
589     TRACE_smpi_comm_out(get_pid());
590   }
591 }
592
593 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
594 {
595   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
596   colls::barrier(MPI_COMM_WORLD);
597   TRACE_smpi_comm_out(get_pid());
598 }
599
600 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
601 {
602   const BcastArgParser& args = get_args();
603   TRACE_smpi_comm_in(get_pid(), "action_bcast",
604                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
605                                                     0, Datatype::encode(args.datatype1), ""));
606
607   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
608
609   TRACE_smpi_comm_out(get_pid());
610 }
611
612 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
613 {
614   const ReduceArgParser& args = get_args();
615   TRACE_smpi_comm_in(get_pid(), "action_reduce",
616                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
617                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
618
619   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
620                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
621                 args.root, MPI_COMM_WORLD);
622   if (args.comp_size != 0.0)
623     simgrid::s4u::this_actor::exec_init(args.comp_size)
624       ->set_name("computation")
625       ->start()
626       ->wait();
627
628   TRACE_smpi_comm_out(get_pid());
629 }
630
631 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
632 {
633   const AllReduceArgParser& args = get_args();
634   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
635                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
636                                                     Datatype::encode(args.datatype1), ""));
637
638   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
639                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
640                    MPI_COMM_WORLD);
641   if (args.comp_size != 0.0)
642     simgrid::s4u::this_actor::exec_init(args.comp_size)
643       ->set_name("computation")
644       ->start()
645       ->wait();
646
647   TRACE_smpi_comm_out(get_pid());
648 }
649
650 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
651 {
652   const AllToAllArgParser& args = get_args();
653   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
654                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
655                                                     Datatype::encode(args.datatype1),
656                                                     Datatype::encode(args.datatype2)));
657
658   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
659                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
660                   MPI_COMM_WORLD);
661
662   TRACE_smpi_comm_out(get_pid());
663 }
664
665 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
666 {
667   const GatherArgParser& args = get_args();
668   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
669                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
670                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
671                                                     Datatype::encode(args.datatype2)));
672
673   if (get_name() == "gather") {
674     int rank = MPI_COMM_WORLD->rank();
675     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
676                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
677                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
678   } else
679     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
681                      MPI_COMM_WORLD);
682
683   TRACE_smpi_comm_out(get_pid());
684 }
685
686 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
687 {
688   int rank = MPI_COMM_WORLD->rank();
689   const GatherVArgParser& args = get_args();
690   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
691                      new simgrid::instr::VarCollTIData(
692                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
693                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694
695   if (get_name() == "gatherv") {
696     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
698                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
699   } else {
700     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
702                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
703   }
704
705   TRACE_smpi_comm_out(get_pid());
706 }
707
708 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
709 {
710   int rank = MPI_COMM_WORLD->rank();
711   const ScatterArgParser& args = get_args();
712   TRACE_smpi_comm_in(get_pid(), "action_scatter",
713                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
714                                                     Datatype::encode(args.datatype1),
715                                                     Datatype::encode(args.datatype2)));
716
717   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
718                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
719                  args.datatype2, args.root, MPI_COMM_WORLD);
720
721   TRACE_smpi_comm_out(get_pid());
722 }
723
724 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
725 {
726   int rank = MPI_COMM_WORLD->rank();
727   const ScatterVArgParser& args = get_args();
728   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
729                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
730                                                        nullptr, Datatype::encode(args.datatype1),
731                                                        Datatype::encode(args.datatype2)));
732
733   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
734                   args.sendcounts->data(), args.disps.data(), args.datatype1,
735                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
736                   MPI_COMM_WORLD);
737
738   TRACE_smpi_comm_out(get_pid());
739 }
740
741 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
742 {
743   const ReduceScatterArgParser& args = get_args();
744   TRACE_smpi_comm_in(
745       get_pid(), "action_reducescatter",
746       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
747                                         std::to_string(args.comp_size),
748                                         Datatype::encode(args.datatype1)));
749
750   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
751                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
752                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
753   if (args.comp_size != 0.0)
754     simgrid::s4u::this_actor::exec_init(args.comp_size)
755       ->set_name("computation")
756       ->start()
757       ->wait();
758   TRACE_smpi_comm_out(get_pid());
759 }
760
761 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
762 {
763   const ScanArgParser& args = get_args();
764   TRACE_smpi_comm_in(get_pid(), "action_scan",
765                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
766                      args.size, 0, Datatype::encode(args.datatype1), ""));
767   if (get_name() == "scan")
768     colls::scan(send_buffer(args.size * args.datatype1->size()),
769               recv_buffer(args.size * args.datatype1->size()), args.size,
770               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
771   else
772     colls::exscan(send_buffer(args.size * args.datatype1->size()),
773               recv_buffer(args.size * args.datatype1->size()), args.size,
774               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
775
776   if (args.comp_size != 0.0)
777     simgrid::s4u::this_actor::exec_init(args.comp_size)
778       ->set_name("computation")
779       ->start()
780       ->wait();
781   TRACE_smpi_comm_out(get_pid());
782 }
783
784 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
785 {
786   const AllToAllVArgParser& args = get_args();
787   TRACE_smpi_comm_in(get_pid(), __func__,
788                      new simgrid::instr::VarCollTIData(
789                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
790                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
791
792   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
793                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
794                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
795
796   TRACE_smpi_comm_out(get_pid());
797 }
798 } // Replay Namespace
799 }} // namespace simgrid::smpi
800
801 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
802 /** @brief Only initialize the replay, don't do it for real */
803 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
804 {
805   xbt_assert(not smpi_process()->initializing());
806
807   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
808   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
809   simgrid::smpi::ActorExt::init();
810
811   smpi_process()->mark_as_initialized();
812   smpi_process()->set_replaying(true);
813
814   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
815   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
816   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
817   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
818   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
819   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
821   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
822   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
823   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
824   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
825   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
826   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
827   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
828   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
829   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
830   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
831   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
832   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
833   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
834   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
835   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
836   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
837   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
838   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
839   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
840   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
841   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
842   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
843   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
844   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
845
846   //if we have a delayed start, sleep here.
847   if (start_delay_flops > 0) {
848     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
849     private_execute_flops(start_delay_flops);
850   } else {
851     // Wait for the other actors to initialize also
852     simgrid::s4u::this_actor::yield();
853   }
854   if(_smpi_init_sleep > 0)
855     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
856 }
857
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int rank, const char* private_trace_filename)
860 {
861   static int active_processes = 0;
862   active_processes++;
863   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
864   std::string rank_string                      = std::to_string(rank);
865   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
866
867   /* and now, finalize everything */
868   /* One active process will stop. Decrease the counter*/
869   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
870   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
871   if (count_requests > 0) {
872     std::vector<MPI_Request> requests(count_requests);
873     unsigned int i=0;
874
875     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
876       requests[i] = pair.second;
877       i++;
878     }
879     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
880   }
881
882   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
883     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
884
885   active_processes--;
886
887   if(active_processes==0){
888     /* Last process alive speaking: end the simulated timer */
889     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
890     smpi_free_replay_tmp_buffers();
891   }
892
893   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
894                      new simgrid::instr::NoOpTIData("finalize"));
895
896   smpi_process()->finalize();
897
898   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
899 }
900
901 /** @brief chain a replay initialization and a replay start */
902 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
903 {
904   smpi_replay_init(instance_id, rank, start_delay_flops);
905   smpi_replay_main(rank, private_trace_filename);
906 }