Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'pikachuyann/simgrid-stoprofiles'
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92   RequestStorage() = default;
93   int size() const { return store.size(); }
94
95   req_storage_t& get_store() { return store; }
96
97   void get_requests(std::vector<MPI_Request>& vec) const
98   {
99     for (auto const& pair : store) {
100       auto& req       = pair.second;
101       auto my_proc_id = simgrid::s4u::this_actor::get_pid();
102       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103         vec.push_back(pair.second);
104         pair.second->print_request("MM");
105       }
106     }
107   }
108
109     MPI_Request find(int src, int dst, int tag)
110     {
111       auto it = store.find(req_key_t(src, dst, tag));
112       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
113     }
114
115     void remove(const Request* req)
116     {
117       if (req == MPI_REQUEST_NULL) return;
118
119       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
120     }
121
122     void add(MPI_Request req)
123     {
124       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
126     }
127
128     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129     void addNullRequest(int src, int dst, int tag)
130     {
131       store.insert({req_key_t(
132             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
133             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
134             tag), MPI_REQUEST_NULL});
135     }
136 };
137
138 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
139 {
140   CHECK_ACTION_PARAMS(action, 3, 0)
141   src = std::stoi(action[2]);
142   dst = std::stoi(action[3]);
143   tag = std::stoi(action[4]);
144 }
145
146 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
147 {
148   CHECK_ACTION_PARAMS(action, 3, 1)
149   partner = std::stoi(action[2]);
150   tag     = std::stoi(action[3]);
151   size    = parse_double(action[4]);
152   if (action.size() > 5)
153     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
154 }
155
156 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 {
158   CHECK_ACTION_PARAMS(action, 1, 0)
159   flops = parse_double(action[2]);
160 }
161
162 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   time = parse_double(action[2]);
166 }
167
168 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 2, 0)
171   filename = std::string(action[2]);
172   line = std::stoi(action[3]);
173 }
174
175 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 {
177   CHECK_ACTION_PARAMS(action, 1, 2)
178   size = parse_double(action[2]);
179   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
180   if (action.size() > 4)
181     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
182 }
183
184 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 {
186   CHECK_ACTION_PARAMS(action, 2, 2)
187   comm_size = parse_double(action[2]);
188   comp_size = parse_double(action[3]);
189   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
190   if (action.size() > 5)
191     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
192 }
193
194 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 {
196   CHECK_ACTION_PARAMS(action, 2, 1)
197   comm_size = parse_double(action[2]);
198   comp_size = parse_double(action[3]);
199   if (action.size() > 4)
200     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
201 }
202
203 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 {
205   CHECK_ACTION_PARAMS(action, 2, 1)
206   comm_size = MPI_COMM_WORLD->size();
207   send_size = parse_double(action[2]);
208   recv_size = parse_double(action[3]);
209
210   if (action.size() > 4)
211     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
212   if (action.size() > 5)
213     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
214 }
215
216 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
217 {
218   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
219         0 gather 68 68 0 0 0
220       where:
221         1) 68 is the sendcounts
222         2) 68 is the recvcounts
223         3) 0 is the root node
224         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
225         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
226   */
227   CHECK_ACTION_PARAMS(action, 2, 3)
228   comm_size = MPI_COMM_WORLD->size();
229   send_size = parse_double(action[2]);
230   recv_size = parse_double(action[3]);
231
232   if (name == "gather") {
233     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
234     if (action.size() > 5)
235       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
236     if (action.size() > 6)
237       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
238   } else {
239     if (action.size() > 4)
240       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
241     if (action.size() > 5)
242       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
243   }
244 }
245
246 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
247 {
248   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
249        0 gather 68 68 10 10 10 0 0 0
250      where:
251        1) 68 is the sendcount
252        2) 68 10 10 10 is the recvcounts
253        3) 0 is the root node
254        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
255        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
256   */
257   comm_size = MPI_COMM_WORLD->size();
258   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
259   send_size  = parse_double(action[2]);
260   disps      = std::vector<int>(comm_size, 0);
261   recvcounts = std::make_shared<std::vector<int>>(comm_size);
262
263   if (name == "gatherv") {
264     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
265     if (action.size() > 4 + comm_size)
266       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
267     if (action.size() > 5 + comm_size)
268       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
269   } else {
270     int disp_index     = 0;
271     /* The 3 comes from "0 gather <sendcount>", which must always be present.
272      * The + comm_size is the recvcounts array, which must also be present
273      */
274     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
275       int datatype_index = 3 + comm_size;
276       disp_index     = datatype_index + 1;
277       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
278       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279     } else if (action.size() >
280                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
281       disp_index = 3 + comm_size;
282     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
283       int datatype_index = 3 + comm_size;
284       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
286     }
287
288     if (disp_index != 0) {
289       for (unsigned int i = 0; i < comm_size; i++)
290         disps[i]          = std::stoi(action[disp_index + i]);
291     }
292   }
293
294   for (unsigned int i = 0; i < comm_size; i++) {
295     (*recvcounts)[i] = std::stoi(action[i + 3]);
296   }
297   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
298 }
299
300 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
301 {
302   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
303         0 gather 68 68 0 0 0
304       where:
305         1) 68 is the sendcounts
306         2) 68 is the recvcounts
307         3) 0 is the root node
308         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
309         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
310   */
311   CHECK_ACTION_PARAMS(action, 2, 3)
312   comm_size = MPI_COMM_WORLD->size();
313   send_size = parse_double(action[2]);
314   recv_size = parse_double(action[3]);
315   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
316   if (action.size() > 5)
317     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
318   if (action.size() > 6)
319     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
320 }
321
322 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
323 {
324   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
325      0 gather 68 10 10 10 68 0 0 0
326       where:
327       1) 68 10 10 10 is the sendcounts
328       2) 68 is the recvcount
329       3) 0 is the root node
330       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
332   */
333   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
334   recv_size  = parse_double(action[2 + comm_size]);
335   disps      = std::vector<int>(comm_size, 0);
336   sendcounts = std::make_shared<std::vector<int>>(comm_size);
337
338   if (action.size() > 5 + comm_size)
339     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
340   if (action.size() > 5 + comm_size)
341     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
342
343   for (unsigned int i = 0; i < comm_size; i++) {
344     (*sendcounts)[i] = std::stoi(action[i + 2]);
345   }
346   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
347   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
348 }
349
350 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
351 {
352   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
353        0 reducescatter 275427 275427 275427 204020 11346849 0
354      where:
355        1) The first four values after the name of the action declare the recvcounts array
356        2) The value 11346849 is the amount of instructions
357        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
358   */
359   comm_size = MPI_COMM_WORLD->size();
360   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
361   comp_size  = parse_double(action[2 + comm_size]);
362   recvcounts = std::make_shared<std::vector<int>>(comm_size);
363   if (action.size() > 3 + comm_size)
364     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
365
366   for (unsigned int i = 0; i < comm_size; i++) {
367     recvcounts->push_back(std::stoi(action[i + 2]));
368   }
369   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
370 }
371
372 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
373 {
374   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
375         0 alltoallv 100 1 7 10 12 100 1 70 10 5
376      where:
377       1) 100 is the size of the send buffer *sizeof(int),
378       2) 1 7 10 12 is the sendcounts array
379       3) 100*sizeof(int) is the size of the receiver buffer
380       4)  1 70 10 5 is the recvcounts array
381   */
382   comm_size = MPI_COMM_WORLD->size();
383   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
384   sendcounts = std::make_shared<std::vector<int>>(comm_size);
385   recvcounts = std::make_shared<std::vector<int>>(comm_size);
386   senddisps  = std::vector<int>(comm_size, 0);
387   recvdisps  = std::vector<int>(comm_size, 0);
388
389   if (action.size() > 5 + 2 * comm_size)
390     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
391   if (action.size() > 5 + 2 * comm_size)
392     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
393
394   send_buf_size = parse_double(action[2]);
395   recv_buf_size = parse_double(action[3 + comm_size]);
396   for (unsigned int i = 0; i < comm_size; i++) {
397     (*sendcounts)[i] = std::stoi(action[3 + i]);
398     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
399   }
400   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
401   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
402 }
403
404 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
405 {
406   std::string s = boost::algorithm::join(action, " ");
407   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
408   const WaitTestParser& args = get_args();
409   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
410   req_storage.remove(request);
411
412   if (request == MPI_REQUEST_NULL) {
413     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
414      * return.*/
415     return;
416   }
417
418   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
419
420   // Must be taken before Request::wait() since the request may be set to
421   // MPI_REQUEST_NULL by Request::wait!
422   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
423   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
424   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
425
426   MPI_Status status;
427   Request::wait(&request, &status);
428
429   TRACE_smpi_comm_out(rank);
430   if (is_wait_for_receive)
431     TRACE_smpi_recv(args.src, args.dst, args.tag);
432 }
433
434 void SendAction::kernel(simgrid::xbt::ReplayAction&)
435 {
436   const SendRecvParser& args = get_args();
437   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
438
439   TRACE_smpi_comm_in(
440       get_pid(), __func__,
441       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
442   if (not TRACE_smpi_view_internals())
443     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
444
445   if (get_name() == "send") {
446     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447   } else if (get_name() == "isend") {
448     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
449     req_storage.add(request);
450   } else {
451     xbt_die("Don't know this action, %s", get_name().c_str());
452   }
453
454   TRACE_smpi_comm_out(get_pid());
455 }
456
457 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
458 {
459   const SendRecvParser& args = get_args();
460   TRACE_smpi_comm_in(
461       get_pid(), __func__,
462       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
463
464   MPI_Status status;
465   // unknown size from the receiver point of view
466   double arg_size = args.size;
467   if (arg_size <= 0.0) {
468     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469     arg_size = status.count;
470   }
471
472   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
473   if (get_name() == "recv") {
474     is_recv = true;
475     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476   } else if (get_name() == "irecv") {
477     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478     req_storage.add(request);
479   } else {
480     THROW_IMPOSSIBLE;
481   }
482
483   TRACE_smpi_comm_out(get_pid());
484   if (is_recv && not TRACE_smpi_view_internals()) {
485     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
487   }
488 }
489
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
491 {
492   const ComputeParser& args = get_args();
493   if (smpi_cfg_simulate_computation()) {
494     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
495   }
496 }
497
498 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
499 {
500   const SleepParser& args = get_args();
501   XBT_DEBUG("Sleep for: %lf secs", args.time);
502   int rank = simgrid::s4u::this_actor::get_pid();
503   TRACE_smpi_sleeping_in(rank, args.time);
504   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
505   TRACE_smpi_sleeping_out(rank);
506 }
507
508 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
509 {
510   const LocationParser& args = get_args();
511   smpi_trace_set_call_location(args.filename.c_str(), args.line);
512 }
513
514 void TestAction::kernel(simgrid::xbt::ReplayAction&)
515 {
516   const WaitTestParser& args = get_args();
517   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
518   req_storage.remove(request);
519   // if request is null here, this may mean that a previous test has succeeded
520   // Different times in traced application and replayed version may lead to this
521   // In this case, ignore the extra calls.
522   if (request != MPI_REQUEST_NULL) {
523     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
524
525     MPI_Status status;
526     int flag = 0;
527     Request::test(&request, &status, &flag);
528
529     XBT_DEBUG("MPI_Test result: %d", flag);
530     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
531      * nullptr.*/
532     if (request == MPI_REQUEST_NULL)
533       req_storage.addNullRequest(args.src, args.dst, args.tag);
534     else
535       req_storage.add(request);
536
537     TRACE_smpi_comm_out(get_pid());
538   }
539 }
540
541 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
542 {
543   CHECK_ACTION_PARAMS(action, 0, 1)
544     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
545     : MPI_BYTE;  // default TAU datatype
546
547   /* start a simulated timer */
548   smpi_process()->simulated_start();
549 }
550
551 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
552 {
553   /* nothing to do */
554 }
555
556 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
557 {
558   const unsigned int count_requests = req_storage.size();
559
560   if (count_requests > 0) {
561     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
562     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
563     std::vector<MPI_Request> reqs;
564     req_storage.get_requests(reqs);
565     for (auto const& req : reqs) {
566       if (req && (req->flags() & MPI_REQ_RECV)) {
567         sender_receiver.emplace_back(req->src(), req->dst());
568       }
569     }
570     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
571     req_storage.get_store().clear();
572
573     for (auto const& pair : sender_receiver) {
574       TRACE_smpi_recv(pair.first, pair.second, 0);
575     }
576     TRACE_smpi_comm_out(get_pid());
577   }
578 }
579
580 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
581 {
582   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
583   colls::barrier(MPI_COMM_WORLD);
584   TRACE_smpi_comm_out(get_pid());
585 }
586
587 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
588 {
589   const BcastArgParser& args = get_args();
590   TRACE_smpi_comm_in(get_pid(), "action_bcast",
591                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(), -1.0,
592                                                     args.size, -1, Datatype::encode(args.datatype1), ""));
593
594   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
595
596   TRACE_smpi_comm_out(get_pid());
597 }
598
599 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
600 {
601   const ReduceArgParser& args = get_args();
602   TRACE_smpi_comm_in(get_pid(), "action_reduce",
603                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
604                                                     args.comp_size, args.comm_size, -1,
605                                                     Datatype::encode(args.datatype1), ""));
606
607   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
608                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
609                 args.root, MPI_COMM_WORLD);
610   private_execute_flops(args.comp_size);
611
612   TRACE_smpi_comm_out(get_pid());
613 }
614
615 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
616 {
617   const AllReduceArgParser& args = get_args();
618   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
619                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
620                                                     Datatype::encode(args.datatype1), ""));
621
622   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
623                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
624                    MPI_COMM_WORLD);
625   private_execute_flops(args.comp_size);
626
627   TRACE_smpi_comm_out(get_pid());
628 }
629
630 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
631 {
632   const AllToAllArgParser& args = get_args();
633   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
634                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
635                                                     Datatype::encode(args.datatype1),
636                                                     Datatype::encode(args.datatype2)));
637
638   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
639                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
640                   MPI_COMM_WORLD);
641
642   TRACE_smpi_comm_out(get_pid());
643 }
644
645 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
646 {
647   const GatherArgParser& args = get_args();
648   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
649                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
650                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
651                                                     Datatype::encode(args.datatype2)));
652
653   if (get_name() == "gather") {
654     int rank = MPI_COMM_WORLD->rank();
655     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
656                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
657                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
658   } else
659     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
660                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
661                      MPI_COMM_WORLD);
662
663   TRACE_smpi_comm_out(get_pid());
664 }
665
666 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
667 {
668   int rank = MPI_COMM_WORLD->rank();
669   const GatherVArgParser& args = get_args();
670   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
671                      new simgrid::instr::VarCollTIData(
672                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
673                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
674
675   if (get_name() == "gatherv") {
676     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
677                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
678                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
679   } else {
680     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
682                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
683   }
684
685   TRACE_smpi_comm_out(get_pid());
686 }
687
688 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
689 {
690   int rank = MPI_COMM_WORLD->rank();
691   const ScatterArgParser& args = get_args();
692   TRACE_smpi_comm_in(get_pid(), "action_scatter",
693                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
694                                                     Datatype::encode(args.datatype1),
695                                                     Datatype::encode(args.datatype2)));
696
697   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
698                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
699                  args.datatype2, args.root, MPI_COMM_WORLD);
700
701   TRACE_smpi_comm_out(get_pid());
702 }
703
704 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
705 {
706   int rank = MPI_COMM_WORLD->rank();
707   const ScatterVArgParser& args = get_args();
708   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
709                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
710                                                        nullptr, Datatype::encode(args.datatype1),
711                                                        Datatype::encode(args.datatype2)));
712
713   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
714                   args.sendcounts->data(), args.disps.data(), args.datatype1,
715                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
716                   MPI_COMM_WORLD);
717
718   TRACE_smpi_comm_out(get_pid());
719 }
720
721 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
722 {
723   const ReduceScatterArgParser& args = get_args();
724   TRACE_smpi_comm_in(
725       get_pid(), "action_reducescatter",
726       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
727                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
728                                         Datatype::encode(args.datatype1)));
729
730   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
731                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
732                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
733
734   private_execute_flops(args.comp_size);
735   TRACE_smpi_comm_out(get_pid());
736 }
737
738 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
739 {
740   const AllToAllVArgParser& args = get_args();
741   TRACE_smpi_comm_in(get_pid(), __func__,
742                      new simgrid::instr::VarCollTIData(
743                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
744                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
745
746   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
747                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
748                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
749
750   TRACE_smpi_comm_out(get_pid());
751 }
752 } // Replay Namespace
753 }} // namespace simgrid::smpi
754
755 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
756 /** @brief Only initialize the replay, don't do it for real */
757 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
758 {
759   xbt_assert(not smpi_process()->initializing());
760
761   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
762   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
763   simgrid::smpi::ActorExt::init();
764
765   smpi_process()->mark_as_initialized();
766   smpi_process()->set_replaying(true);
767
768   int my_proc_id = simgrid::s4u::this_actor::get_pid();
769
770   TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
771   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
799
800   //if we have a delayed start, sleep here.
801   if (start_delay_flops > 0) {
802     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803     private_execute_flops(start_delay_flops);
804   } else {
805     // Wait for the other actors to initialize also
806     simgrid::s4u::this_actor::yield();
807   }
808 }
809
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* trace_filename)
812 {
813   static int active_processes = 0;
814   active_processes++;
815   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816   std::string rank_string                      = std::to_string(rank);
817   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
818
819   /* and now, finalize everything */
820   /* One active process will stop. Decrease the counter*/
821   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
823   if (count_requests > 0) {
824     auto* requests = new MPI_Request[count_requests];
825     unsigned int i=0;
826
827     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828       requests[i] = pair.second;
829       i++;
830     }
831     simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
832     delete[] requests;
833   }
834   active_processes--;
835
836   if(active_processes==0){
837     /* Last process alive speaking: end the simulated timer */
838     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
839     smpi_free_replay_tmp_buffers();
840   }
841
842   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
843                      new simgrid::instr::NoOpTIData("finalize"));
844
845   smpi_process()->finalize();
846
847   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
848 }
849
850 /** @brief chain a replay initialization and a replay start */
851 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
852 {
853   smpi_replay_init(instance_id, rank, start_delay_flops);
854   smpi_replay_main(rank, trace_filename);
855 }