Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix issues in replay of scatter and scatterv
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <cmath>
16 #include <limits>
17 #include <memory>
18 #include <numeric>
19 #include <tuple>
20 #include <unordered_map>
21 #include <vector>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
30 public:
31   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 };
33
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 {
36   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 }
38
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 public:
42   static void apply(size_t& seed, Tuple const& tuple)
43   {
44     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45     hash_combine(seed, std::get<Index>(tuple));
46   }
47 };
48
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 public:
51   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 };
53
54 template <typename... TT> class hash<std::tuple<TT...>> {
55 public:
56   size_t operator()(std::tuple<TT...> const& tt) const
57   {
58     size_t seed = 0;
59     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
60     return seed;
61   }
62 };
63 }
64
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
66 {
67   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68     std::string s = boost::algorithm::join(action, " ");
69     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
70   }
71 }
72
73 /* Helper functions */
74 static double parse_double(const std::string& string)
75 {
76   return xbt_str_parse_double(string.c_str(), "not a double");
77 }
78
79 template <typename T> static T parse_integer(const std::string& string)
80 {
81   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83                  val <= static_cast<double>(std::numeric_limits<T>::max()),
84              "out of range: %g", val);
85   return static_cast<T>(val);
86 }
87
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
89 {
90   return i < action.size() ? std::stoi(action[i]) : 0;
91 }
92
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
94 {
95   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
96 }
97
98 namespace simgrid {
99 namespace smpi {
100
101 namespace replay {
102 MPI_Datatype MPI_DEFAULT_TYPE;
103
104 class RequestStorage {
105 private:
106   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
108
109   req_storage_t store;
110
111 public:
112   RequestStorage() = default;
113   size_t size() const { return store.size(); }
114
115   req_storage_t& get_store() { return store; }
116
117   void get_requests(std::vector<MPI_Request>& vec) const
118   {
119     for (auto const& pair : store) {
120       auto& req       = pair.second;
121       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123         vec.push_back(pair.second);
124         pair.second->print_request("MM");
125       }
126     }
127   }
128
129     MPI_Request find(int src, int dst, int tag)
130     {
131       auto it = store.find(req_key_t(src, dst, tag));
132       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
133     }
134
135     void remove(const Request* req)
136     {
137       if (req == MPI_REQUEST_NULL) return;
138
139       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
140     }
141
142     void add(MPI_Request req)
143     {
144       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
146     }
147
148     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149     void addNullRequest(int src, int dst, int tag)
150     {
151       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
152                     MPI_REQUEST_NULL});
153     }
154 };
155
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 {
158   CHECK_ACTION_PARAMS(action, 3, 0)
159   src = std::stoi(action[2]);
160   dst = std::stoi(action[3]);
161   tag = std::stoi(action[4]);
162 }
163
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
165 {
166   CHECK_ACTION_PARAMS(action, 3, 1)
167   partner = std::stoi(action[2]);
168   tag     = std::stoi(action[3]);
169   size      = parse_integer<size_t>(action[4]);
170   datatype1 = parse_datatype(action, 5);
171 }
172
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
174 {
175   CHECK_ACTION_PARAMS(action, 1, 0)
176   flops = parse_double(action[2]);
177 }
178
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 {
181   CHECK_ACTION_PARAMS(action, 1, 0)
182   time = parse_double(action[2]);
183 }
184
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 {
187   CHECK_ACTION_PARAMS(action, 2, 0)
188   filename = std::string(action[2]);
189   line = std::stoi(action[3]);
190 }
191
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 {
194   CHECK_ACTION_PARAMS(action, 1, 2)
195   size      = parse_integer<size_t>(action[2]);
196   root      = parse_root(action, 3);
197   datatype1 = parse_datatype(action, 4);
198 }
199
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 2)
203   comm_size = parse_integer<unsigned>(action[2]);
204   comp_size = parse_double(action[3]);
205   root      = parse_root(action, 4);
206   datatype1 = parse_datatype(action, 5);
207 }
208
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = parse_integer<unsigned>(action[2]);
213   comp_size = parse_double(action[3]);
214   datatype1 = parse_datatype(action, 4);
215 }
216
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 1)
220   comm_size = MPI_COMM_WORLD->size();
221   send_size = parse_integer<int>(action[2]);
222   recv_size = parse_integer<int>(action[3]);
223   datatype1 = parse_datatype(action, 4);
224   datatype2 = parse_datatype(action, 5);
225 }
226
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
228 {
229   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
230         0 gather 68 68 0 0 0
231       where:
232         1) 68 is the sendcounts
233         2) 68 is the recvcounts
234         3) 0 is the root node
235         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
237   */
238   CHECK_ACTION_PARAMS(action, 2, 3)
239   comm_size = MPI_COMM_WORLD->size();
240   send_size = parse_integer<int>(action[2]);
241   recv_size = parse_integer<int>(action[3]);
242
243   if (name == "gather") {
244     root      = parse_root(action, 4);
245     datatype1 = parse_datatype(action, 5);
246     datatype2 = parse_datatype(action, 6);
247   } else {
248     root      = 0;
249     datatype1 = parse_datatype(action, 4);
250     datatype2 = parse_datatype(action, 5);
251   }
252 }
253
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
255 {
256   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257        0 gather 68 68 10 10 10 0 0 0
258      where:
259        1) 68 is the sendcount
260        2) 68 10 10 10 is the recvcounts
261        3) 0 is the root node
262        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
264   */
265   comm_size = MPI_COMM_WORLD->size();
266   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267   send_size  = parse_integer<int>(action[2]);
268   disps      = std::vector<int>(comm_size, 0);
269   recvcounts = std::make_shared<std::vector<int>>(comm_size);
270
271   if (name == "gatherv") {
272     root      = parse_root(action, 3 + comm_size);
273     datatype1 = parse_datatype(action, 4 + comm_size);
274     datatype2 = parse_datatype(action, 5 + comm_size);
275   } else {
276     root                = 0;
277     unsigned disp_index = 0;
278     /* The 3 comes from "0 gather <sendcount>", which must always be present.
279      * The + comm_size is the recvcounts array, which must also be present
280      */
281     if (action.size() > 3 + comm_size + comm_size) {
282       // datatype + disp are specified
283       datatype1  = parse_datatype(action, 3 + comm_size);
284       datatype2  = parse_datatype(action, 4 + comm_size);
285       disp_index = 5 + comm_size;
286     } else if (action.size() > 3 + comm_size + 2) {
287       // disps specified; datatype is not specified; use the default one
288       datatype1  = MPI_DEFAULT_TYPE;
289       datatype2  = MPI_DEFAULT_TYPE;
290       disp_index = 3 + comm_size;
291     } else {
292       // no disp specified, maybe only datatype,
293       datatype1 = parse_datatype(action, 3 + comm_size);
294       datatype2 = parse_datatype(action, 4 + comm_size);
295     }
296
297     if (disp_index != 0) {
298       xbt_assert(disp_index + comm_size <= action.size());
299       for (unsigned i = 0; i < comm_size; i++)
300         disps[i]          = std::stoi(action[disp_index + i]);
301     }
302   }
303
304   for (unsigned int i = 0; i < comm_size; i++) {
305     (*recvcounts)[i] = std::stoi(action[i + 3]);
306   }
307   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
308 }
309
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
311 {
312   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
313         0 gather 68 68 0 0 0
314       where:
315         1) 68 is the sendcounts
316         2) 68 is the recvcounts
317         3) 0 is the root node
318         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
320   */
321   comm_size = MPI_COMM_WORLD->size();
322   CHECK_ACTION_PARAMS(action, 2, 3)
323   comm_size = MPI_COMM_WORLD->size();
324   send_size = parse_integer<int>(action[2]);
325   recv_size = parse_integer<int>(action[3]);
326   root      = parse_root(action, 4);
327   datatype1 = parse_datatype(action, 5);
328   datatype2 = parse_datatype(action, 6);
329 }
330
331 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
332 {
333   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
334      0 gather 68 10 10 10 68 0 0 0
335       where:
336       1) 68 10 10 10 is the sendcounts
337       2) 68 is the recvcount
338       3) 0 is the root node
339       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
340       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
341   */
342   comm_size = MPI_COMM_WORLD->size();
343   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
344   recv_size  = parse_integer<int>(action[2 + comm_size]);
345   disps      = std::vector<int>(comm_size, 0);
346   sendcounts = std::make_shared<std::vector<int>>(comm_size);
347
348   root      = parse_root(action, 3 + comm_size);
349   datatype1 = parse_datatype(action, 4 + comm_size);
350   datatype2 = parse_datatype(action, 5 + comm_size);
351
352   for (unsigned int i = 0; i < comm_size; i++) {
353     (*sendcounts)[i] = std::stoi(action[i + 2]);
354   }
355   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
356 }
357
358 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
359 {
360   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
361        0 reducescatter 275427 275427 275427 204020 11346849 0
362      where:
363        1) The first four values after the name of the action declare the recvcounts array
364        2) The value 11346849 is the amount of instructions
365        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
366   */
367   comm_size = MPI_COMM_WORLD->size();
368   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
369   comp_size  = parse_double(action[2 + comm_size]);
370   recvcounts = std::make_shared<std::vector<int>>(comm_size);
371   datatype1  = parse_datatype(action, 3 + comm_size);
372
373   for (unsigned int i = 0; i < comm_size; i++) {
374     recvcounts->push_back(std::stoi(action[i + 2]));
375   }
376   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 }
378
379 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 {
381   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
382         0 alltoallv 100 1 7 10 12 100 1 70 10 5
383      where:
384       1) 100 is the size of the send buffer *sizeof(int),
385       2) 1 7 10 12 is the sendcounts array
386       3) 100*sizeof(int) is the size of the receiver buffer
387       4)  1 70 10 5 is the recvcounts array
388   */
389   comm_size = MPI_COMM_WORLD->size();
390   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
391   sendcounts = std::make_shared<std::vector<int>>(comm_size);
392   recvcounts = std::make_shared<std::vector<int>>(comm_size);
393   senddisps  = std::vector<int>(comm_size, 0);
394   recvdisps  = std::vector<int>(comm_size, 0);
395
396   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
397   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
398
399   send_buf_size = parse_integer<int>(action[2]);
400   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
401   for (unsigned int i = 0; i < comm_size; i++) {
402     (*sendcounts)[i] = std::stoi(action[3 + i]);
403     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
404   }
405   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
406   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
407 }
408
409 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
410 {
411   std::string s = boost::algorithm::join(action, " ");
412   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
413   const WaitTestParser& args = get_args();
414   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
415   req_storage.remove(request);
416
417   if (request == MPI_REQUEST_NULL) {
418     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
419      * return.*/
420     return;
421   }
422
423   // Must be taken before Request::wait() since the request may be set to
424   // MPI_REQUEST_NULL by Request::wait!
425   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426
427   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
428
429   MPI_Status status;
430   Request::wait(&request, &status);
431
432   TRACE_smpi_comm_out(get_pid());
433   if (is_wait_for_receive)
434     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
435 }
436
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
438 {
439   const SendRecvParser& args = get_args();
440   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
441
442   TRACE_smpi_comm_in(
443       get_pid(), __func__,
444       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (get_name() == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (get_name() == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", get_name().c_str());
455   }
456
457   TRACE_smpi_comm_out(get_pid());
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   const SendRecvParser& args = get_args();
463   TRACE_smpi_comm_in(
464       get_pid(), __func__,
465       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
466
467   MPI_Status status;
468   // unknown size from the receiver point of view
469   size_t arg_size = args.size;
470   if (arg_size == 0) {
471     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472     arg_size = status.count;
473   }
474
475   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476   if (get_name() == "recv") {
477     is_recv = true;
478     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479   } else if (get_name() == "irecv") {
480     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481     req_storage.add(request);
482   } else {
483     THROW_IMPOSSIBLE;
484   }
485
486   TRACE_smpi_comm_out(get_pid());
487   if (is_recv && not TRACE_smpi_view_internals()) {
488     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
489     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
490   }
491 }
492
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
494 {
495   const ComputeParser& args = get_args();
496   if (smpi_cfg_simulate_computation()) {
497     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
498   }
499 }
500
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
502 {
503   const SleepParser& args = get_args();
504   XBT_DEBUG("Sleep for: %lf secs", args.time);
505   aid_t pid = simgrid::s4u::this_actor::get_pid();
506   TRACE_smpi_sleeping_in(pid, args.time);
507   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508   TRACE_smpi_sleeping_out(pid);
509 }
510
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   const LocationParser& args = get_args();
514   smpi_trace_set_call_location(args.filename.c_str(), args.line);
515 }
516
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
518 {
519   const WaitTestParser& args = get_args();
520   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521   req_storage.remove(request);
522   // if request is null here, this may mean that a previous test has succeeded
523   // Different times in traced application and replayed version may lead to this
524   // In this case, ignore the extra calls.
525   if (request != MPI_REQUEST_NULL) {
526     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
527
528     MPI_Status status;
529     int flag = 0;
530     Request::test(&request, &status, &flag);
531
532     XBT_DEBUG("MPI_Test result: %d", flag);
533     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
534      * nullptr.*/
535     if (request == MPI_REQUEST_NULL)
536       req_storage.addNullRequest(args.src, args.dst, args.tag);
537     else
538       req_storage.add(request);
539
540     TRACE_smpi_comm_out(get_pid());
541   }
542 }
543
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
545 {
546   CHECK_ACTION_PARAMS(action, 0, 1)
547     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548     : MPI_BYTE;  // default TAU datatype
549
550   /* start a simulated timer */
551   smpi_process()->simulated_start();
552 }
553
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   /* nothing to do */
557 }
558
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
560 {
561   const size_t count_requests = req_storage.size();
562
563   if (count_requests > 0) {
564     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
565     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
566     std::vector<MPI_Request> reqs;
567     req_storage.get_requests(reqs);
568     for (auto const& req : reqs) {
569       if (req && (req->flags() & MPI_REQ_RECV)) {
570         sender_receiver.emplace_back(req->src(), req->dst());
571       }
572     }
573     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574     req_storage.get_store().clear();
575
576     for (auto const& pair : sender_receiver) {
577       TRACE_smpi_recv(pair.first, pair.second, 0);
578     }
579     TRACE_smpi_comm_out(get_pid());
580   }
581 }
582
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
584 {
585   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586   colls::barrier(MPI_COMM_WORLD);
587   TRACE_smpi_comm_out(get_pid());
588 }
589
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
591 {
592   const BcastArgParser& args = get_args();
593   TRACE_smpi_comm_in(get_pid(), "action_bcast",
594                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
595                                                     0, Datatype::encode(args.datatype1), ""));
596
597   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
598
599   TRACE_smpi_comm_out(get_pid());
600 }
601
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
603 {
604   const ReduceArgParser& args = get_args();
605   TRACE_smpi_comm_in(get_pid(), "action_reduce",
606                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
607                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
608
609   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
610                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
611                 args.root, MPI_COMM_WORLD);
612   if(args.comp_size != 0.0)
613     private_execute_flops(args.comp_size);
614
615   TRACE_smpi_comm_out(get_pid());
616 }
617
618 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
619 {
620   const AllReduceArgParser& args = get_args();
621   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
622                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
623                                                     Datatype::encode(args.datatype1), ""));
624
625   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
626                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
627                    MPI_COMM_WORLD);
628   if(args.comp_size != 0.0)
629     private_execute_flops(args.comp_size);
630
631   TRACE_smpi_comm_out(get_pid());
632 }
633
634 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
635 {
636   const AllToAllArgParser& args = get_args();
637   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
638                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
639                                                     Datatype::encode(args.datatype1),
640                                                     Datatype::encode(args.datatype2)));
641
642   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
643                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
644                   MPI_COMM_WORLD);
645
646   TRACE_smpi_comm_out(get_pid());
647 }
648
649 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
650 {
651   const GatherArgParser& args = get_args();
652   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
653                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
654                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
655                                                     Datatype::encode(args.datatype2)));
656
657   if (get_name() == "gather") {
658     int rank = MPI_COMM_WORLD->rank();
659     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
660                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
661                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
662   } else
663     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
664                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
665                      MPI_COMM_WORLD);
666
667   TRACE_smpi_comm_out(get_pid());
668 }
669
670 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
671 {
672   int rank = MPI_COMM_WORLD->rank();
673   const GatherVArgParser& args = get_args();
674   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
675                      new simgrid::instr::VarCollTIData(
676                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
677                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
678
679   if (get_name() == "gatherv") {
680     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
681                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
682                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
683   } else {
684     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
685                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
686                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
687   }
688
689   TRACE_smpi_comm_out(get_pid());
690 }
691
692 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
693 {
694   int rank = MPI_COMM_WORLD->rank();
695   const ScatterArgParser& args = get_args();
696   TRACE_smpi_comm_in(get_pid(), "action_scatter",
697                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
698                                                     Datatype::encode(args.datatype1),
699                                                     Datatype::encode(args.datatype2)));
700
701   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
702                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
703                  args.datatype2, args.root, MPI_COMM_WORLD);
704
705   TRACE_smpi_comm_out(get_pid());
706 }
707
708 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
709 {
710   int rank = MPI_COMM_WORLD->rank();
711   const ScatterVArgParser& args = get_args();
712   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
713                      new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
714                                                        nullptr, Datatype::encode(args.datatype1),
715                                                        Datatype::encode(args.datatype2)));
716
717   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
718                   args.sendcounts->data(), args.disps.data(), args.datatype1,
719                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
720                   MPI_COMM_WORLD);
721
722   TRACE_smpi_comm_out(get_pid());
723 }
724
725 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
726 {
727   const ReduceScatterArgParser& args = get_args();
728   TRACE_smpi_comm_in(
729       get_pid(), "action_reducescatter",
730       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
731                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
732                                         Datatype::encode(args.datatype1)));
733
734   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
735                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
736                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
737
738   private_execute_flops(args.comp_size);
739   TRACE_smpi_comm_out(get_pid());
740 }
741
742 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
743 {
744   const AllToAllVArgParser& args = get_args();
745   TRACE_smpi_comm_in(get_pid(), __func__,
746                      new simgrid::instr::VarCollTIData(
747                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
748                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
749
750   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
751                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
752                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
753
754   TRACE_smpi_comm_out(get_pid());
755 }
756 } // Replay Namespace
757 }} // namespace simgrid::smpi
758
759 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
760 /** @brief Only initialize the replay, don't do it for real */
761 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
762 {
763   xbt_assert(not smpi_process()->initializing());
764
765   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
766   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
767   simgrid::smpi::ActorExt::init();
768
769   smpi_process()->mark_as_initialized();
770   smpi_process()->set_replaying(true);
771
772   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
773   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
774   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
775   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
777   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
778   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
784   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
785   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
786   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
787   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
788   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
789   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
790   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
791   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
792   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
793   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
794   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
795   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
796   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
797   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
798   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
799   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
800   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
801
802   //if we have a delayed start, sleep here.
803   if (start_delay_flops > 0) {
804     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
805     private_execute_flops(start_delay_flops);
806   } else {
807     // Wait for the other actors to initialize also
808     simgrid::s4u::this_actor::yield();
809   }
810 }
811
812 /** @brief actually run the replay after initialization */
813 void smpi_replay_main(int rank, const char* private_trace_filename)
814 {
815   static int active_processes = 0;
816   active_processes++;
817   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
818   std::string rank_string                      = std::to_string(rank);
819   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
820
821   /* and now, finalize everything */
822   /* One active process will stop. Decrease the counter*/
823   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
824   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
825   if (count_requests > 0) {
826     std::vector<MPI_Request> requests(count_requests);
827     unsigned int i=0;
828
829     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
830       requests[i] = pair.second;
831       i++;
832     }
833     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
834   }
835
836   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
837     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
838
839   active_processes--;
840
841   if(active_processes==0){
842     /* Last process alive speaking: end the simulated timer */
843     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
844     smpi_free_replay_tmp_buffers();
845   }
846
847   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
848                      new simgrid::instr::NoOpTIData("finalize"));
849
850   smpi_process()->finalize();
851
852   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
853 }
854
855 /** @brief chain a replay initialization and a replay start */
856 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
857 {
858   smpi_replay_init(instance_id, rank, start_delay_flops);
859   smpi_replay_main(rank, private_trace_filename);
860 }