Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[pvs-studio] Don't mix initialized and non-initialized fields.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <cmath>
16 #include <limits>
17 #include <memory>
18 #include <numeric>
19 #include <tuple>
20 #include <unordered_map>
21 #include <vector>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
30 public:
31   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 };
33
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 {
36   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 }
38
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 public:
42   static void apply(size_t& seed, Tuple const& tuple)
43   {
44     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45     hash_combine(seed, std::get<Index>(tuple));
46   }
47 };
48
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 public:
51   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 };
53
54 template <typename... TT> class hash<std::tuple<TT...>> {
55 public:
56   size_t operator()(std::tuple<TT...> const& tt) const
57   {
58     size_t seed = 0;
59     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
60     return seed;
61   }
62 };
63 }
64
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
66 {
67   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68     std::string s = boost::algorithm::join(action, " ");
69     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
70   }
71 }
72
73 /* Helper functions */
74 static double parse_double(const std::string& string)
75 {
76   return xbt_str_parse_double(string.c_str(), "not a double");
77 }
78
79 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
80 {
81   return i < action.size() ? std::stoi(action[i]) : 0;
82 }
83
84 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
85 {
86   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
87 }
88
89 namespace simgrid {
90 namespace smpi {
91
92 namespace replay {
93 MPI_Datatype MPI_DEFAULT_TYPE;
94
95 class RequestStorage {
96 private:
97   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
98   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
99
100   req_storage_t store;
101
102 public:
103   RequestStorage() = default;
104   size_t size() const { return store.size(); }
105
106   req_storage_t& get_store() { return store; }
107
108   void get_requests(std::vector<MPI_Request>& vec) const
109   {
110     for (auto const& pair : store) {
111       auto& req       = pair.second;
112       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
113       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
114         vec.push_back(pair.second);
115         pair.second->print_request("MM");
116       }
117     }
118   }
119
120     MPI_Request find(int src, int dst, int tag)
121     {
122       auto it = store.find(req_key_t(src, dst, tag));
123       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
124     }
125
126     void remove(const Request* req)
127     {
128       if (req == MPI_REQUEST_NULL) return;
129
130       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
131     }
132
133     void add(MPI_Request req)
134     {
135       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
136         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
137     }
138
139     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
140     void addNullRequest(int src, int dst, int tag)
141     {
142       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
143                     MPI_REQUEST_NULL});
144     }
145 };
146
147 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
148 {
149   CHECK_ACTION_PARAMS(action, 3, 0)
150   src = std::stoi(action[2]);
151   dst = std::stoi(action[3]);
152   tag = std::stoi(action[4]);
153 }
154
155 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
156 {
157   CHECK_ACTION_PARAMS(action, 3, 1)
158   partner = std::stoi(action[2]);
159   tag     = std::stoi(action[3]);
160   size    = parse_double(action[4]);
161   datatype1 = parse_datatype(action, 5);
162 }
163
164 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
165 {
166   CHECK_ACTION_PARAMS(action, 1, 0)
167   flops = parse_double(action[2]);
168 }
169
170 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
171 {
172   CHECK_ACTION_PARAMS(action, 1, 0)
173   time = parse_double(action[2]);
174 }
175
176 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
177 {
178   CHECK_ACTION_PARAMS(action, 2, 0)
179   filename = std::string(action[2]);
180   line = std::stoi(action[3]);
181 }
182
183 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 {
185   CHECK_ACTION_PARAMS(action, 1, 2)
186   size = parse_double(action[2]);
187   root      = parse_root(action, 3);
188   datatype1 = parse_datatype(action, 4);
189 }
190
191 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 {
193   CHECK_ACTION_PARAMS(action, 2, 2)
194   double arg2 = trunc(parse_double(action[2]));
195   xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
196   comm_size = static_cast<unsigned>(arg2);
197   comp_size = parse_double(action[3]);
198   root      = parse_root(action, 4);
199   datatype1 = parse_datatype(action, 5);
200 }
201
202 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 {
204   CHECK_ACTION_PARAMS(action, 2, 1)
205   double arg2 = trunc(parse_double(action[2]));
206   xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
207   comm_size = static_cast<unsigned>(arg2);
208   comp_size = parse_double(action[3]);
209   datatype1 = parse_datatype(action, 4);
210 }
211
212 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 1)
215   comm_size = MPI_COMM_WORLD->size();
216   send_size = parse_double(action[2]);
217   recv_size = parse_double(action[3]);
218   datatype1 = parse_datatype(action, 4);
219   datatype2 = parse_datatype(action, 5);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = parse_root(action, 4);
240     datatype1 = parse_datatype(action, 5);
241     datatype2 = parse_datatype(action, 6);
242   } else {
243     root      = 0;
244     datatype1 = parse_datatype(action, 4);
245     datatype2 = parse_datatype(action, 5);
246   }
247 }
248
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
250 {
251   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252        0 gather 68 68 10 10 10 0 0 0
253      where:
254        1) 68 is the sendcount
255        2) 68 10 10 10 is the recvcounts
256        3) 0 is the root node
257        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
259   */
260   comm_size = MPI_COMM_WORLD->size();
261   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262   send_size  = parse_double(action[2]);
263   disps      = std::vector<int>(comm_size, 0);
264   recvcounts = std::make_shared<std::vector<int>>(comm_size);
265
266   if (name == "gatherv") {
267     root      = parse_root(action, 3 + comm_size);
268     datatype1 = parse_datatype(action, 4 + comm_size);
269     datatype2 = parse_datatype(action, 5 + comm_size);
270   } else {
271     root                = 0;
272     unsigned disp_index = 0;
273     /* The 3 comes from "0 gather <sendcount>", which must always be present.
274      * The + comm_size is the recvcounts array, which must also be present
275      */
276     if (action.size() > 3 + comm_size + comm_size) {
277       // datatype + disp are specified
278       datatype1  = parse_datatype(action, 3 + comm_size);
279       datatype2  = parse_datatype(action, 4 + comm_size);
280       disp_index = 5 + comm_size;
281     } else if (action.size() > 3 + comm_size + 2) {
282       // disps specified; datatype is not specified; use the default one
283       datatype1  = MPI_DEFAULT_TYPE;
284       datatype2  = MPI_DEFAULT_TYPE;
285       disp_index = 3 + comm_size;
286     } else {
287       // no disp specified, maybe only datatype,
288       datatype1 = parse_datatype(action, 3 + comm_size);
289       datatype2 = parse_datatype(action, 4 + comm_size);
290     }
291
292     if (disp_index != 0) {
293       xbt_assert(disp_index + comm_size <= action.size());
294       for (unsigned i = 0; i < comm_size; i++)
295         disps[i]          = std::stoi(action[disp_index + i]);
296     }
297   }
298
299   for (unsigned int i = 0; i < comm_size; i++) {
300     (*recvcounts)[i] = std::stoi(action[i + 3]);
301   }
302   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
303 }
304
305 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
306 {
307   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
308         0 gather 68 68 0 0 0
309       where:
310         1) 68 is the sendcounts
311         2) 68 is the recvcounts
312         3) 0 is the root node
313         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
314         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
315   */
316   CHECK_ACTION_PARAMS(action, 2, 3)
317   comm_size = MPI_COMM_WORLD->size();
318   send_size = parse_double(action[2]);
319   recv_size = parse_double(action[3]);
320   root      = parse_root(action, 4);
321   datatype1 = parse_datatype(action, 5);
322   datatype2 = parse_datatype(action, 6);
323 }
324
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
326 {
327   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328      0 gather 68 10 10 10 68 0 0 0
329       where:
330       1) 68 10 10 10 is the sendcounts
331       2) 68 is the recvcount
332       3) 0 is the root node
333       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
335   */
336   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337   recv_size  = parse_double(action[2 + comm_size]);
338   disps      = std::vector<int>(comm_size, 0);
339   sendcounts = std::make_shared<std::vector<int>>(comm_size);
340
341   root      = parse_root(action, 3 + comm_size);
342   datatype1 = parse_datatype(action, 4 + comm_size);
343   datatype2 = parse_datatype(action, 5 + comm_size);
344
345   for (unsigned int i = 0; i < comm_size; i++) {
346     (*sendcounts)[i] = std::stoi(action[i + 2]);
347   }
348   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
349 }
350
351 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
352 {
353   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
354        0 reducescatter 275427 275427 275427 204020 11346849 0
355      where:
356        1) The first four values after the name of the action declare the recvcounts array
357        2) The value 11346849 is the amount of instructions
358        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
359   */
360   comm_size = MPI_COMM_WORLD->size();
361   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
362   comp_size  = parse_double(action[2 + comm_size]);
363   recvcounts = std::make_shared<std::vector<int>>(comm_size);
364   datatype1  = parse_datatype(action, 3 + comm_size);
365
366   for (unsigned int i = 0; i < comm_size; i++) {
367     recvcounts->push_back(std::stoi(action[i + 2]));
368   }
369   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
370 }
371
372 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
373 {
374   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
375         0 alltoallv 100 1 7 10 12 100 1 70 10 5
376      where:
377       1) 100 is the size of the send buffer *sizeof(int),
378       2) 1 7 10 12 is the sendcounts array
379       3) 100*sizeof(int) is the size of the receiver buffer
380       4)  1 70 10 5 is the recvcounts array
381   */
382   comm_size = MPI_COMM_WORLD->size();
383   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
384   sendcounts = std::make_shared<std::vector<int>>(comm_size);
385   recvcounts = std::make_shared<std::vector<int>>(comm_size);
386   senddisps  = std::vector<int>(comm_size, 0);
387   recvdisps  = std::vector<int>(comm_size, 0);
388
389   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
390   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
391
392   send_buf_size = parse_double(action[2]);
393   recv_buf_size = parse_double(action[3 + comm_size]);
394   for (unsigned int i = 0; i < comm_size; i++) {
395     (*sendcounts)[i] = std::stoi(action[3 + i]);
396     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
397   }
398   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
399   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
400 }
401
402 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
403 {
404   std::string s = boost::algorithm::join(action, " ");
405   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
406   const WaitTestParser& args = get_args();
407   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
408   req_storage.remove(request);
409
410   if (request == MPI_REQUEST_NULL) {
411     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
412      * return.*/
413     return;
414   }
415
416   aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
417
418   // Must be taken before Request::wait() since the request may be set to
419   // MPI_REQUEST_NULL by Request::wait!
420   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
421   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
422   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
423
424   MPI_Status status;
425   Request::wait(&request, &status);
426
427   TRACE_smpi_comm_out(rank);
428   if (is_wait_for_receive)
429     TRACE_smpi_recv(args.src, args.dst, args.tag);
430 }
431
432 void SendAction::kernel(simgrid::xbt::ReplayAction&)
433 {
434   const SendRecvParser& args = get_args();
435   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
436
437   TRACE_smpi_comm_in(
438       get_pid(), __func__,
439       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
440   if (not TRACE_smpi_view_internals())
441     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
442
443   if (get_name() == "send") {
444     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445   } else if (get_name() == "isend") {
446     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447     req_storage.add(request);
448   } else {
449     xbt_die("Don't know this action, %s", get_name().c_str());
450   }
451
452   TRACE_smpi_comm_out(get_pid());
453 }
454
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
456 {
457   const SendRecvParser& args = get_args();
458   TRACE_smpi_comm_in(
459       get_pid(), __func__,
460       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
461
462   MPI_Status status;
463   // unknown size from the receiver point of view
464   double arg_size = args.size;
465   if (arg_size <= 0.0) {
466     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467     arg_size = status.count;
468   }
469
470   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
471   if (get_name() == "recv") {
472     is_recv = true;
473     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474   } else if (get_name() == "irecv") {
475     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476     req_storage.add(request);
477   } else {
478     THROW_IMPOSSIBLE;
479   }
480
481   TRACE_smpi_comm_out(get_pid());
482   if (is_recv && not TRACE_smpi_view_internals()) {
483     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
484     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
485   }
486 }
487
488 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
489 {
490   const ComputeParser& args = get_args();
491   if (smpi_cfg_simulate_computation()) {
492     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
493   }
494 }
495
496 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
497 {
498   const SleepParser& args = get_args();
499   XBT_DEBUG("Sleep for: %lf secs", args.time);
500   aid_t pid = simgrid::s4u::this_actor::get_pid();
501   TRACE_smpi_sleeping_in(pid, args.time);
502   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503   TRACE_smpi_sleeping_out(pid);
504 }
505
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   const LocationParser& args = get_args();
509   smpi_trace_set_call_location(args.filename.c_str(), args.line);
510 }
511
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
513 {
514   const WaitTestParser& args = get_args();
515   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
516   req_storage.remove(request);
517   // if request is null here, this may mean that a previous test has succeeded
518   // Different times in traced application and replayed version may lead to this
519   // In this case, ignore the extra calls.
520   if (request != MPI_REQUEST_NULL) {
521     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
522
523     MPI_Status status;
524     int flag = 0;
525     Request::test(&request, &status, &flag);
526
527     XBT_DEBUG("MPI_Test result: %d", flag);
528     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
529      * nullptr.*/
530     if (request == MPI_REQUEST_NULL)
531       req_storage.addNullRequest(args.src, args.dst, args.tag);
532     else
533       req_storage.add(request);
534
535     TRACE_smpi_comm_out(get_pid());
536   }
537 }
538
539 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
540 {
541   CHECK_ACTION_PARAMS(action, 0, 1)
542     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
543     : MPI_BYTE;  // default TAU datatype
544
545   /* start a simulated timer */
546   smpi_process()->simulated_start();
547 }
548
549 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
550 {
551   /* nothing to do */
552 }
553
554 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   const size_t count_requests = req_storage.size();
557
558   if (count_requests > 0) {
559     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
560     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
561     std::vector<MPI_Request> reqs;
562     req_storage.get_requests(reqs);
563     for (auto const& req : reqs) {
564       if (req && (req->flags() & MPI_REQ_RECV)) {
565         sender_receiver.emplace_back(req->src(), req->dst());
566       }
567     }
568     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
569     req_storage.get_store().clear();
570
571     for (auto const& pair : sender_receiver) {
572       TRACE_smpi_recv(pair.first, pair.second, 0);
573     }
574     TRACE_smpi_comm_out(get_pid());
575   }
576 }
577
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
579 {
580   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
581   colls::barrier(MPI_COMM_WORLD);
582   TRACE_smpi_comm_out(get_pid());
583 }
584
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
586 {
587   const BcastArgParser& args = get_args();
588   TRACE_smpi_comm_in(get_pid(), "action_bcast",
589                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
590                                                     0, Datatype::encode(args.datatype1), ""));
591
592   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
593
594   TRACE_smpi_comm_out(get_pid());
595 }
596
597 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
598 {
599   const ReduceArgParser& args = get_args();
600   TRACE_smpi_comm_in(get_pid(), "action_reduce",
601                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
602                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
603
604   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
605                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
606                 args.root, MPI_COMM_WORLD);
607   private_execute_flops(args.comp_size);
608
609   TRACE_smpi_comm_out(get_pid());
610 }
611
612 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
613 {
614   const AllReduceArgParser& args = get_args();
615   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
616                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
617                                                     Datatype::encode(args.datatype1), ""));
618
619   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
620                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
621                    MPI_COMM_WORLD);
622   private_execute_flops(args.comp_size);
623
624   TRACE_smpi_comm_out(get_pid());
625 }
626
627 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
628 {
629   const AllToAllArgParser& args = get_args();
630   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
631                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
632                                                     Datatype::encode(args.datatype1),
633                                                     Datatype::encode(args.datatype2)));
634
635   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
636                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
637                   MPI_COMM_WORLD);
638
639   TRACE_smpi_comm_out(get_pid());
640 }
641
642 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
643 {
644   const GatherArgParser& args = get_args();
645   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
646                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
647                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
648                                                     Datatype::encode(args.datatype2)));
649
650   if (get_name() == "gather") {
651     int rank = MPI_COMM_WORLD->rank();
652     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
654                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
655   } else
656     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
657                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
658                      MPI_COMM_WORLD);
659
660   TRACE_smpi_comm_out(get_pid());
661 }
662
663 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
664 {
665   int rank = MPI_COMM_WORLD->rank();
666   const GatherVArgParser& args = get_args();
667   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668                      new simgrid::instr::VarCollTIData(
669                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
670                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
671
672   if (get_name() == "gatherv") {
673     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
674                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
675                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
676   } else {
677     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
678                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
679                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
680   }
681
682   TRACE_smpi_comm_out(get_pid());
683 }
684
685 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
686 {
687   int rank = MPI_COMM_WORLD->rank();
688   const ScatterArgParser& args = get_args();
689   TRACE_smpi_comm_in(get_pid(), "action_scatter",
690                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
691                                                     Datatype::encode(args.datatype1),
692                                                     Datatype::encode(args.datatype2)));
693
694   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
696                  args.datatype2, args.root, MPI_COMM_WORLD);
697
698   TRACE_smpi_comm_out(get_pid());
699 }
700
701 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
702 {
703   int rank = MPI_COMM_WORLD->rank();
704   const ScatterVArgParser& args = get_args();
705   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
706                      new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
707                                                        nullptr, Datatype::encode(args.datatype1),
708                                                        Datatype::encode(args.datatype2)));
709
710   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
711                   args.sendcounts->data(), args.disps.data(), args.datatype1,
712                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
713                   MPI_COMM_WORLD);
714
715   TRACE_smpi_comm_out(get_pid());
716 }
717
718 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
719 {
720   const ReduceScatterArgParser& args = get_args();
721   TRACE_smpi_comm_in(
722       get_pid(), "action_reducescatter",
723       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
724                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
725                                         Datatype::encode(args.datatype1)));
726
727   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
728                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
729                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
730
731   private_execute_flops(args.comp_size);
732   TRACE_smpi_comm_out(get_pid());
733 }
734
735 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
736 {
737   const AllToAllVArgParser& args = get_args();
738   TRACE_smpi_comm_in(get_pid(), __func__,
739                      new simgrid::instr::VarCollTIData(
740                          "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
741                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
742
743   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
744                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
745                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
746
747   TRACE_smpi_comm_out(get_pid());
748 }
749 } // Replay Namespace
750 }} // namespace simgrid::smpi
751
752 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
753 /** @brief Only initialize the replay, don't do it for real */
754 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
755 {
756   xbt_assert(not smpi_process()->initializing());
757
758   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
759   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
760   simgrid::smpi::ActorExt::init();
761
762   smpi_process()->mark_as_initialized();
763   smpi_process()->set_replaying(true);
764
765   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
766   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
767   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
768   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
769   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
770   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
771   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
772   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
773   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
774   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
775   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
779   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
780   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
781   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
782   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
783   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
784   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
785   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
786   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
787   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
788   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
789   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
790   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
791   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
792   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
793   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
794
795   //if we have a delayed start, sleep here.
796   if (start_delay_flops > 0) {
797     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
798     private_execute_flops(start_delay_flops);
799   } else {
800     // Wait for the other actors to initialize also
801     simgrid::s4u::this_actor::yield();
802   }
803 }
804
805 /** @brief actually run the replay after initialization */
806 void smpi_replay_main(int rank, const char* private_trace_filename)
807 {
808   static int active_processes = 0;
809   active_processes++;
810   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
811   std::string rank_string                      = std::to_string(rank);
812   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
813
814   /* and now, finalize everything */
815   /* One active process will stop. Decrease the counter*/
816   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
817   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
818   if (count_requests > 0) {
819     std::vector<MPI_Request> requests(count_requests);
820     unsigned int i=0;
821
822     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
823       requests[i] = pair.second;
824       i++;
825     }
826     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
827   }
828   active_processes--;
829
830   if(active_processes==0){
831     /* Last process alive speaking: end the simulated timer */
832     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
833     smpi_free_replay_tmp_buffers();
834   }
835
836   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
837                      new simgrid::instr::NoOpTIData("finalize"));
838
839   smpi_process()->finalize();
840
841   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
842 }
843
844 /** @brief chain a replay initialization and a replay start */
845 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
846 {
847   smpi_replay_init(instance_id, rank, start_delay_flops);
848   smpi_replay_main(rank, private_trace_filename);
849 }