Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use boost::lexical_cast instead of xbt_str_parse_* in C++ files.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <boost/lexical_cast.hpp>
19 #include <cmath>
20 #include <limits>
21 #include <memory>
22 #include <numeric>
23 #include <tuple>
24 #include <unordered_map>
25 #include <vector>
26
27 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
28 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
29 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
30 // this could go into a header file.
31 namespace hash_tuple {
32 template <typename TT> class hash {
33 public:
34   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
35 };
36
37 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
38 {
39   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
40 }
41
42 // Recursive template code derived from Matthieu M.
43 template <class Tuple, size_t Index = std::tuple_size_v<Tuple> - 1> class HashValueImpl {
44 public:
45   static void apply(size_t& seed, Tuple const& tuple)
46   {
47     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
48     hash_combine(seed, std::get<Index>(tuple));
49   }
50 };
51
52 template <class Tuple> class HashValueImpl<Tuple, 0> {
53 public:
54   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
55 };
56
57 template <typename... TT> class hash<std::tuple<TT...>> {
58 public:
59   size_t operator()(std::tuple<TT...> const& tt) const
60   {
61     size_t seed = 0;
62     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
63     return seed;
64   }
65 };
66 }
67
68 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 {
70   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
71     std::string s = boost::algorithm::join(action, " ");
72     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73   }
74 }
75
76 /* Helper functions */
77 static double parse_double(const std::string& string)
78 {
79   try {
80     return boost::lexical_cast<double>(string);
81   } catch (boost::bad_lexical_cast const&) {
82     throw std::invalid_argument("not a double: " + string);
83   }
84 }
85
86 template <typename T> static T parse_integer(const std::string& string)
87 {
88   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
89   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
90                  val <= static_cast<double>(std::numeric_limits<T>::max()),
91              "out of range: %g", val);
92   return static_cast<T>(val);
93 }
94
95 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? std::stoi(action[i]) : 0;
98 }
99
100 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
101 {
102   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
103 }
104
105 namespace simgrid::smpi::replay {
106 MPI_Datatype MPI_DEFAULT_TYPE;
107
108 class RequestStorage {
109 private:
110   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
111   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
112
113   req_storage_t store;
114
115 public:
116   RequestStorage() = default;
117   size_t size() const { return store.size(); }
118
119   req_storage_t& get_store() { return store; }
120
121   void get_requests(std::vector<MPI_Request>& vec) const
122   {
123     for (auto const& [_, reqs] : store) {
124       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
125       for (const auto& req : reqs) {
126         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
127           vec.push_back(req);
128           req->print_request("MM");
129         }
130       }
131     }
132   }
133
134   MPI_Request pop(int src, int dst, int tag)
135   {
136     auto it = store.find(req_key_t(src, dst, tag));
137     if (it == store.end())
138      return MPI_REQUEST_NULL;
139     MPI_Request req = it->second.front();
140     it->second.pop_front();
141     if(it->second.empty())
142       store.erase(req_key_t(src, dst, tag));
143     return req;
144   }
145
146   void add(MPI_Request req)
147   {
148     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
149       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
150     }
151   }
152
153   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
154   void addNullRequest(int src, int dst, int tag)
155   {
156     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
157     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
158     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
159   }
160 };
161
162 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 3, 0)
165   src = std::stoi(action[2]);
166   dst = std::stoi(action[3]);
167   tag = std::stoi(action[4]);
168 }
169
170 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
171 {
172   CHECK_ACTION_PARAMS(action, 3, 1)
173   partner = std::stoi(action[2]);
174   tag     = std::stoi(action[3]);
175   size      = parse_integer<ssize_t>(action[4]);
176   datatype1 = parse_datatype(action, 5);
177 }
178
179 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 {
181   CHECK_ACTION_PARAMS(action, 1, 0)
182   flops = parse_double(action[2]);
183 }
184
185 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 {
187   CHECK_ACTION_PARAMS(action, 1, 0)
188   time = parse_double(action[2]);
189 }
190
191 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 {
193   CHECK_ACTION_PARAMS(action, 2, 0)
194   filename = action[2];
195   line = std::stoi(action[3]);
196 }
197
198 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
199 {
200   CHECK_ACTION_PARAMS(action, 6, 0)
201   sendcount = parse_integer<int>(action[2]);
202   dst = std::stoi(action[3]);
203   recvcount = parse_integer<int>(action[4]);
204   src = std::stoi(action[5]);
205   datatype1 = parse_datatype(action, 6);
206   datatype2 = parse_datatype(action, 7);
207 }
208
209 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 1, 2)
212   size      = parse_integer<size_t>(action[2]);
213   root      = parse_root(action, 3);
214   datatype1 = parse_datatype(action, 4);
215 }
216
217 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 2)
220   comm_size = parse_integer<unsigned>(action[2]);
221   comp_size = parse_double(action[3]);
222   root      = parse_root(action, 4);
223   datatype1 = parse_datatype(action, 5);
224 }
225
226 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
227 {
228   CHECK_ACTION_PARAMS(action, 2, 1)
229   comm_size = parse_integer<unsigned>(action[2]);
230   comp_size = parse_double(action[3]);
231   datatype1 = parse_datatype(action, 4);
232 }
233
234 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
235 {
236   CHECK_ACTION_PARAMS(action, 2, 1)
237   comm_size = MPI_COMM_WORLD->size();
238   send_size = parse_integer<int>(action[2]);
239   recv_size = parse_integer<int>(action[3]);
240   datatype1 = parse_datatype(action, 4);
241   datatype2 = parse_datatype(action, 5);
242 }
243
244 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
245 {
246   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
247         0 gather 68 68 0 0 0
248       where:
249         1) 68 is the sendcounts
250         2) 68 is the recvcounts
251         3) 0 is the root node
252         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
253         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
254   */
255   CHECK_ACTION_PARAMS(action, 2, 3)
256   comm_size = MPI_COMM_WORLD->size();
257   send_size = parse_integer<int>(action[2]);
258   recv_size = parse_integer<int>(action[3]);
259
260   if (name == "gather") {
261     root      = parse_root(action, 4);
262     datatype1 = parse_datatype(action, 5);
263     datatype2 = parse_datatype(action, 6);
264   } else {
265     root      = 0;
266     datatype1 = parse_datatype(action, 4);
267     datatype2 = parse_datatype(action, 5);
268   }
269 }
270
271 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
272 {
273   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
274        0 gather 68 68 10 10 10 0 0 0
275      where:
276        1) 68 is the sendcount
277        2) 68 10 10 10 is the recvcounts
278        3) 0 is the root node
279        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
280        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
281   */
282   comm_size = MPI_COMM_WORLD->size();
283   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
284   send_size  = parse_integer<int>(action[2]);
285   disps      = std::vector<int>(comm_size, 0);
286   recvcounts = std::make_shared<std::vector<int>>(comm_size);
287
288   if (name == "gatherv") {
289     root      = parse_root(action, 3 + comm_size);
290     datatype1 = parse_datatype(action, 4 + comm_size);
291     datatype2 = parse_datatype(action, 5 + comm_size);
292   } else {
293     root                = 0;
294     unsigned disp_index = 0;
295     /* The 3 comes from "0 gather <sendcount>", which must always be present.
296      * The + comm_size is the recvcounts array, which must also be present
297      */
298     if (action.size() > 3 + comm_size + comm_size) {
299       // datatype + disp are specified
300       datatype1  = parse_datatype(action, 3 + comm_size);
301       datatype2  = parse_datatype(action, 4 + comm_size);
302       disp_index = 5 + comm_size;
303     } else if (action.size() > 3 + comm_size + 2) {
304       // disps specified; datatype is not specified; use the default one
305       datatype1  = MPI_DEFAULT_TYPE;
306       datatype2  = MPI_DEFAULT_TYPE;
307       disp_index = 3 + comm_size;
308     } else {
309       // no disp specified, maybe only datatype,
310       datatype1 = parse_datatype(action, 3 + comm_size);
311       datatype2 = parse_datatype(action, 4 + comm_size);
312     }
313
314     if (disp_index != 0) {
315       xbt_assert(disp_index + comm_size <= action.size());
316       for (unsigned i = 0; i < comm_size; i++)
317         disps[i]          = std::stoi(action[disp_index + i]);
318     }
319   }
320
321   for (unsigned int i = 0; i < comm_size; i++) {
322     (*recvcounts)[i] = std::stoi(action[i + 3]);
323   }
324   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
325 }
326
327 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
328 {
329   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
330         0 gather 68 68 0 0 0
331       where:
332         1) 68 is the sendcounts
333         2) 68 is the recvcounts
334         3) 0 is the root node
335         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
336         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
337   */
338   comm_size = MPI_COMM_WORLD->size();
339   CHECK_ACTION_PARAMS(action, 2, 3)
340   comm_size = MPI_COMM_WORLD->size();
341   send_size = parse_integer<int>(action[2]);
342   recv_size = parse_integer<int>(action[3]);
343   root      = parse_root(action, 4);
344   datatype1 = parse_datatype(action, 5);
345   datatype2 = parse_datatype(action, 6);
346 }
347
348 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
349 {
350   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
351      0 gather 68 10 10 10 68 0 0 0
352       where:
353       1) 68 10 10 10 is the sendcounts
354       2) 68 is the recvcount
355       3) 0 is the root node
356       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
357       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
358   */
359   comm_size = MPI_COMM_WORLD->size();
360   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
361   recv_size  = parse_integer<int>(action[2 + comm_size]);
362   disps      = std::vector<int>(comm_size, 0);
363   sendcounts = std::make_shared<std::vector<int>>(comm_size);
364
365   root      = parse_root(action, 3 + comm_size);
366   datatype1 = parse_datatype(action, 4 + comm_size);
367   datatype2 = parse_datatype(action, 5 + comm_size);
368
369   for (unsigned int i = 0; i < comm_size; i++) {
370     (*sendcounts)[i] = std::stoi(action[i + 2]);
371   }
372   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
373 }
374
375 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
376 {
377   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
378        0 reducescatter 275427 275427 275427 204020 11346849 0
379      where:
380        1) The first four values after the name of the action declare the recvcounts array
381        2) The value 11346849 is the amount of instructions
382        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
383   */
384   comm_size = MPI_COMM_WORLD->size();
385   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
386   comp_size  = parse_double(action[2 + comm_size]);
387   recvcounts = std::make_shared<std::vector<int>>(comm_size);
388   datatype1  = parse_datatype(action, 3 + comm_size);
389
390   for (unsigned int i = 0; i < comm_size; i++) {
391     (*recvcounts)[i]= std::stoi(action[i + 2]);
392   }
393   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
394 }
395
396 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
397 {
398   CHECK_ACTION_PARAMS(action, 2, 1)
399   size      = parse_integer<size_t>(action[2]);
400   comp_size = parse_double(action[3]);
401   datatype1 = parse_datatype(action, 4);
402 }
403
404 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
405 {
406   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
407         0 alltoallv 100 1 7 10 12 100 1 70 10 5
408      where:
409       1) 100 is the size of the send buffer *sizeof(int),
410       2) 1 7 10 12 is the sendcounts array
411       3) 100*sizeof(int) is the size of the receiver buffer
412       4)  1 70 10 5 is the recvcounts array
413   */
414   comm_size = MPI_COMM_WORLD->size();
415   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
416   sendcounts = std::make_shared<std::vector<int>>(comm_size);
417   recvcounts = std::make_shared<std::vector<int>>(comm_size);
418   senddisps  = std::vector<int>(comm_size, 0);
419   recvdisps  = std::vector<int>(comm_size, 0);
420
421   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
422   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
423
424   send_buf_size = parse_integer<int>(action[2]);
425   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
426   for (unsigned int i = 0; i < comm_size; i++) {
427     (*sendcounts)[i] = std::stoi(action[3 + i]);
428     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
429   }
430   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
431   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
432 }
433
434 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
435 {
436   std::string s = boost::algorithm::join(action, " ");
437   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
438   const WaitTestParser& args = get_args();
439   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
440
441   if (request == MPI_REQUEST_NULL) {
442     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
443      * return.*/
444     return;
445   }
446
447   // Must be taken before Request::wait() since the request may be set to
448   // MPI_REQUEST_NULL by Request::wait!
449   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
450
451   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
452
453   MPI_Status status;
454   Request::wait(&request, &status);
455   if(request!=MPI_REQUEST_NULL)
456     Request::unref(&request);
457   TRACE_smpi_comm_out(get_pid());
458   if (is_wait_for_receive)
459     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
460 }
461
462 void SendAction::kernel(simgrid::xbt::ReplayAction&)
463 {
464   const SendOrRecvParser& args = get_args();
465   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
466
467   TRACE_smpi_comm_in(
468       get_pid(), __func__,
469       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
470   if (not TRACE_smpi_view_internals())
471     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
472
473   if (get_name() == "send") {
474     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
475   } else if (get_name() == "isend") {
476     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
477     req_storage.add(request);
478   } else {
479     xbt_die("Don't know this action, %s", get_name().c_str());
480   }
481
482   TRACE_smpi_comm_out(get_pid());
483 }
484
485 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
486 {
487   const SendOrRecvParser& args = get_args();
488   TRACE_smpi_comm_in(
489       get_pid(), __func__,
490       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
491
492   MPI_Status status;
493   // unknown size from the receiver point of view
494   ssize_t arg_size = args.size;
495   if (arg_size < 0) {
496     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
497     arg_size = status.count;
498   }
499
500   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
501   if (get_name() == "recv") {
502     is_recv = true;
503     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
504   } else if (get_name() == "irecv") {
505     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
506     req_storage.add(request);
507   } else {
508     THROW_IMPOSSIBLE;
509   }
510
511   TRACE_smpi_comm_out(get_pid());
512   if (is_recv && not TRACE_smpi_view_internals()) {
513     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
514     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
515   }
516 }
517
518 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
519 {
520   XBT_DEBUG("Enters SendRecv");
521   const SendRecvParser& args = get_args();
522   aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
523   aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
524   aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
525
526   MPI_Status status;
527   int sendtag=0;
528   int recvtag=0;
529
530   // FIXME: Hack the way to trace this one
531   auto dst_hack = std::make_shared<std::vector<int>>();
532   auto src_hack = std::make_shared<std::vector<int>>();
533   dst_hack->push_back(dst_traced);
534   src_hack->push_back(src_traced);
535   TRACE_smpi_comm_in(my_proc_id, __func__,
536                        new simgrid::instr::VarCollTIData(
537                            "sendRecv", -1, args.sendcount,
538                          dst_hack, args.recvcount, src_hack,
539                            simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
540
541   TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
542
543   simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
544                                      recvtag, MPI_COMM_WORLD, &status);
545
546   TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
547   TRACE_smpi_comm_out(my_proc_id);
548   XBT_DEBUG("Exits SendRecv");
549 }
550
551 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
552 {
553   const ComputeParser& args = get_args();
554   if (smpi_cfg_simulate_computation()) {
555     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
556   }
557 }
558
559 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
560 {
561   const SleepParser& args = get_args();
562   XBT_DEBUG("Sleep for: %lf secs", args.time);
563   aid_t pid = simgrid::s4u::this_actor::get_pid();
564   TRACE_smpi_sleeping_in(pid, args.time);
565   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
566   TRACE_smpi_sleeping_out(pid);
567 }
568
569 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
570 {
571   const LocationParser& args = get_args();
572   smpi_trace_set_call_location(args.filename.c_str(), args.line, "replay_action");
573 }
574
575 void TestAction::kernel(simgrid::xbt::ReplayAction&)
576 {
577   const WaitTestParser& args = get_args();
578   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
579   // if request is null here, this may mean that a previous test has succeeded
580   // Different times in traced application and replayed version may lead to this
581   // In this case, ignore the extra calls.
582   if (request != MPI_REQUEST_NULL) {
583     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
584
585     MPI_Status status;
586     int flag = 0;
587     Request::test(&request, &status, &flag);
588
589     XBT_DEBUG("MPI_Test result: %d", flag);
590     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
591      * nullptr.*/
592     if (request == MPI_REQUEST_NULL)
593       req_storage.addNullRequest(args.src, args.dst, args.tag);
594     else
595       req_storage.add(request);
596
597     TRACE_smpi_comm_out(get_pid());
598   }
599 }
600
601 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
602 {
603   CHECK_ACTION_PARAMS(action, 0, 1)
604     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
605     : MPI_BYTE;  // default TAU datatype
606
607   /* start a simulated timer */
608   smpi_process()->simulated_start();
609 }
610
611 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
612 {
613   /* nothing to do */
614 }
615
616 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
617 {
618   if (req_storage.size() > 0) {
619     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
620     std::vector<MPI_Request> reqs;
621     req_storage.get_requests(reqs);
622     unsigned long count_requests = reqs.size();
623     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
624     for (auto const& req : reqs) {
625       if (req && (req->flags() & MPI_REQ_RECV)) {
626         sender_receiver.emplace_back(req->src(), req->dst());
627       }
628     }
629     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
630     req_storage.get_store().clear();
631
632     for (MPI_Request& req : reqs)
633       if (req != MPI_REQUEST_NULL)
634         Request::unref(&req);
635
636     for (auto const& [src, dst] : sender_receiver) {
637       TRACE_smpi_recv(src, dst, 0);
638     }
639     TRACE_smpi_comm_out(get_pid());
640   }
641 }
642
643 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
644 {
645   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
646   colls::barrier(MPI_COMM_WORLD);
647   TRACE_smpi_comm_out(get_pid());
648 }
649
650 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
651 {
652   const BcastArgParser& args = get_args();
653   TRACE_smpi_comm_in(get_pid(), "action_bcast",
654                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
655                                                     0, Datatype::encode(args.datatype1), ""));
656
657   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
658
659   TRACE_smpi_comm_out(get_pid());
660 }
661
662 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
663 {
664   const ReduceArgParser& args = get_args();
665   TRACE_smpi_comm_in(get_pid(), "action_reduce",
666                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
667                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
668
669   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
670                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
671                 args.root, MPI_COMM_WORLD);
672   if (args.comp_size != 0.0)
673     simgrid::s4u::this_actor::exec_init(args.comp_size)
674       ->set_name("computation")
675       ->start()
676       ->wait();
677
678   TRACE_smpi_comm_out(get_pid());
679 }
680
681 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
682 {
683   const AllReduceArgParser& args = get_args();
684   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
685                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
686                                                     Datatype::encode(args.datatype1), ""));
687
688   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
689                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
690                    MPI_COMM_WORLD);
691   if (args.comp_size != 0.0)
692     simgrid::s4u::this_actor::exec_init(args.comp_size)
693       ->set_name("computation")
694       ->start()
695       ->wait();
696
697   TRACE_smpi_comm_out(get_pid());
698 }
699
700 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
701 {
702   const AllToAllArgParser& args = get_args();
703   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
704                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
705                                                     Datatype::encode(args.datatype1),
706                                                     Datatype::encode(args.datatype2)));
707
708   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
709                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
710                   MPI_COMM_WORLD);
711
712   TRACE_smpi_comm_out(get_pid());
713 }
714
715 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
716 {
717   const GatherArgParser& args = get_args();
718   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
719                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
720                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
721                                                     Datatype::encode(args.datatype2)));
722
723   if (get_name() == "gather") {
724     int rank = MPI_COMM_WORLD->rank();
725     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
727                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
728   } else
729     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
730                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
731                      MPI_COMM_WORLD);
732
733   TRACE_smpi_comm_out(get_pid());
734 }
735
736 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
737 {
738   int rank = MPI_COMM_WORLD->rank();
739   const GatherVArgParser& args = get_args();
740   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
741                      new simgrid::instr::VarCollTIData(
742                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
743                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
744
745   if (get_name() == "gatherv") {
746     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
748                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
749   } else {
750     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
751                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
752                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
753   }
754
755   TRACE_smpi_comm_out(get_pid());
756 }
757
758 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
759 {
760   int rank = MPI_COMM_WORLD->rank();
761   const ScatterArgParser& args = get_args();
762   TRACE_smpi_comm_in(get_pid(), "action_scatter",
763                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
764                                                     Datatype::encode(args.datatype1),
765                                                     Datatype::encode(args.datatype2)));
766
767   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
768                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
769                  args.datatype2, args.root, MPI_COMM_WORLD);
770
771   TRACE_smpi_comm_out(get_pid());
772 }
773
774 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
775 {
776   int rank = MPI_COMM_WORLD->rank();
777   const ScatterVArgParser& args = get_args();
778   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
779                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
780                                                        nullptr, Datatype::encode(args.datatype1),
781                                                        Datatype::encode(args.datatype2)));
782
783   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
784                   args.sendcounts->data(), args.disps.data(), args.datatype1,
785                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
786                   MPI_COMM_WORLD);
787
788   TRACE_smpi_comm_out(get_pid());
789 }
790
791 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
792 {
793   const ReduceScatterArgParser& args = get_args();
794   TRACE_smpi_comm_in(
795       get_pid(), "action_reducescatter",
796       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
797                                         /* ugly as we use datatype field to pass computation as string */
798                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
799                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
800                                         Datatype::encode(args.datatype1)));
801
802   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
803                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
804                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
805   if (args.comp_size != 0.0)
806     simgrid::s4u::this_actor::exec_init(args.comp_size)
807       ->set_name("computation")
808       ->start()
809       ->wait();
810   TRACE_smpi_comm_out(get_pid());
811 }
812
813 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
814 {
815   const ScanArgParser& args = get_args();
816   TRACE_smpi_comm_in(get_pid(), "action_scan",
817                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
818                      args.size, 0, Datatype::encode(args.datatype1), ""));
819   if (get_name() == "scan")
820     colls::scan(send_buffer(args.size * args.datatype1->size()),
821               recv_buffer(args.size * args.datatype1->size()), args.size,
822               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
823   else
824     colls::exscan(send_buffer(args.size * args.datatype1->size()),
825               recv_buffer(args.size * args.datatype1->size()), args.size,
826               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
827
828   if (args.comp_size != 0.0)
829     simgrid::s4u::this_actor::exec_init(args.comp_size)
830       ->set_name("computation")
831       ->start()
832       ->wait();
833   TRACE_smpi_comm_out(get_pid());
834 }
835
836 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
837 {
838   const AllToAllVArgParser& args = get_args();
839   TRACE_smpi_comm_in(get_pid(), __func__,
840                      new simgrid::instr::VarCollTIData(
841                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
842                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
843
844   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
845                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
846                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
847
848   TRACE_smpi_comm_out(get_pid());
849 }
850 } // namespace simgrid::smpi::replay
851
852 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
853 /** @brief Only initialize the replay, don't do it for real */
854 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
855 {
856   xbt_assert(not smpi_process()->initializing());
857
858   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
859   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
860   simgrid::smpi::ActorExt::init();
861
862   smpi_process()->mark_as_initialized();
863   smpi_process()->set_replaying(true);
864
865   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
866   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
867   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
868   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
869   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
870   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
871   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
872   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
873   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
875   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
876   xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
877   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
878   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
879   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
880   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
881   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
882   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
883   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
884   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
885   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
886   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
887   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
888   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
889   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
890   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
891   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
892   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
893   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
894   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
895   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
896   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
897
898   //if we have a delayed start, sleep here.
899   if (start_delay_flops > 0) {
900     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
901     private_execute_flops(start_delay_flops);
902   } else {
903     // Wait for the other actors to initialize also
904     simgrid::s4u::this_actor::yield();
905   }
906   if(_smpi_init_sleep > 0)
907     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
908 }
909
910 /** @brief actually run the replay after initialization */
911 void smpi_replay_main(int rank, const char* private_trace_filename)
912 {
913   static int active_processes = 0;
914   active_processes++;
915   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
916   std::string rank_string                      = std::to_string(rank);
917   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
918
919   /* and now, finalize everything */
920   /* One active process will stop. Decrease the counter*/
921   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
922   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
923   if (count_requests > 0) {
924     std::vector<MPI_Request> requests(count_requests);
925     unsigned int i=0;
926
927     for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
928       for (const auto& req : reqs) {
929         requests[i] = req; // FIXME: overwritten at each iteration?
930       }
931       i++;
932     }
933     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
934   }
935
936   if (simgrid::config::get_value<bool>("smpi/barrier-finalization"))
937     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
938
939   active_processes--;
940
941   if(active_processes==0){
942     /* Last process alive speaking: end the simulated timer */
943     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
944     smpi_free_replay_tmp_buffers();
945   }
946
947   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
948                      new simgrid::instr::NoOpTIData("finalize"));
949
950   smpi_process()->finalize();
951
952   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
953 }
954
955 /** @brief chain a replay initialization and a replay start */
956 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
957 {
958   smpi_replay_init(instance_id, rank, start_delay_flops);
959   smpi_replay_main(rank, private_trace_filename);
960 }