Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'wifi_clean' into 'master'
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
102
103 class RequestStorage {
104 private:
105   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
107
108   req_storage_t store;
109
110 public:
111   RequestStorage() = default;
112   size_t size() const { return store.size(); }
113
114   req_storage_t& get_store() { return store; }
115
116   void get_requests(std::vector<MPI_Request>& vec) const
117   {
118     for (auto const& [_, reqs] : store) {
119       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120       for (auto& req: reqs){
121         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
122           vec.push_back(req);
123           req->print_request("MM");
124         }
125       }
126     }
127   }
128
129   MPI_Request pop(int src, int dst, int tag)
130   {
131     auto it = store.find(req_key_t(src, dst, tag));
132     if (it == store.end())
133      return MPI_REQUEST_NULL;
134     MPI_Request req = it->second.front();
135     it->second.pop_front();
136     if(it->second.empty())
137       store.erase(req_key_t(src, dst, tag));
138     return req;
139   }
140
141   void add(MPI_Request req)
142   {
143     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
145     }
146   }
147
148   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149   void addNullRequest(int src, int dst, int tag)
150   {
151     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
154   }
155 };
156
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 {
159   CHECK_ACTION_PARAMS(action, 3, 0)
160   src = std::stoi(action[2]);
161   dst = std::stoi(action[3]);
162   tag = std::stoi(action[4]);
163 }
164
165 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 {
167   CHECK_ACTION_PARAMS(action, 3, 1)
168   partner = std::stoi(action[2]);
169   tag     = std::stoi(action[3]);
170   size      = parse_integer<ssize_t>(action[4]);
171   datatype1 = parse_datatype(action, 5);
172 }
173
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 0)
177   flops = parse_double(action[2]);
178 }
179
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 {
182   CHECK_ACTION_PARAMS(action, 1, 0)
183   time = parse_double(action[2]);
184 }
185
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 0)
189   filename = std::string(action[2]);
190   line = std::stoi(action[3]);
191 }
192
193 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 {
195   CHECK_ACTION_PARAMS(action, 6, 0)
196   sendcount = parse_integer<int>(action[2]);
197   dst = std::stoi(action[3]);
198   recvcount = parse_integer<int>(action[4]);
199   src = std::stoi(action[5]);
200   datatype1 = parse_datatype(action, 6);
201   datatype2 = parse_datatype(action, 7);
202 }
203
204 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 {
206   CHECK_ACTION_PARAMS(action, 1, 2)
207   size      = parse_integer<size_t>(action[2]);
208   root      = parse_root(action, 3);
209   datatype1 = parse_datatype(action, 4);
210 }
211
212 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 2)
215   comm_size = parse_integer<unsigned>(action[2]);
216   comp_size = parse_double(action[3]);
217   root      = parse_root(action, 4);
218   datatype1 = parse_datatype(action, 5);
219 }
220
221 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
222 {
223   CHECK_ACTION_PARAMS(action, 2, 1)
224   comm_size = parse_integer<unsigned>(action[2]);
225   comp_size = parse_double(action[3]);
226   datatype1 = parse_datatype(action, 4);
227 }
228
229 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
230 {
231   CHECK_ACTION_PARAMS(action, 2, 1)
232   comm_size = MPI_COMM_WORLD->size();
233   send_size = parse_integer<int>(action[2]);
234   recv_size = parse_integer<int>(action[3]);
235   datatype1 = parse_datatype(action, 4);
236   datatype2 = parse_datatype(action, 5);
237 }
238
239 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
240 {
241   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
242         0 gather 68 68 0 0 0
243       where:
244         1) 68 is the sendcounts
245         2) 68 is the recvcounts
246         3) 0 is the root node
247         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249   */
250   CHECK_ACTION_PARAMS(action, 2, 3)
251   comm_size = MPI_COMM_WORLD->size();
252   send_size = parse_integer<int>(action[2]);
253   recv_size = parse_integer<int>(action[3]);
254
255   if (name == "gather") {
256     root      = parse_root(action, 4);
257     datatype1 = parse_datatype(action, 5);
258     datatype2 = parse_datatype(action, 6);
259   } else {
260     root      = 0;
261     datatype1 = parse_datatype(action, 4);
262     datatype2 = parse_datatype(action, 5);
263   }
264 }
265
266 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
267 {
268   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
269        0 gather 68 68 10 10 10 0 0 0
270      where:
271        1) 68 is the sendcount
272        2) 68 10 10 10 is the recvcounts
273        3) 0 is the root node
274        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
275        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276   */
277   comm_size = MPI_COMM_WORLD->size();
278   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
279   send_size  = parse_integer<int>(action[2]);
280   disps      = std::vector<int>(comm_size, 0);
281   recvcounts = std::make_shared<std::vector<int>>(comm_size);
282
283   if (name == "gatherv") {
284     root      = parse_root(action, 3 + comm_size);
285     datatype1 = parse_datatype(action, 4 + comm_size);
286     datatype2 = parse_datatype(action, 5 + comm_size);
287   } else {
288     root                = 0;
289     unsigned disp_index = 0;
290     /* The 3 comes from "0 gather <sendcount>", which must always be present.
291      * The + comm_size is the recvcounts array, which must also be present
292      */
293     if (action.size() > 3 + comm_size + comm_size) {
294       // datatype + disp are specified
295       datatype1  = parse_datatype(action, 3 + comm_size);
296       datatype2  = parse_datatype(action, 4 + comm_size);
297       disp_index = 5 + comm_size;
298     } else if (action.size() > 3 + comm_size + 2) {
299       // disps specified; datatype is not specified; use the default one
300       datatype1  = MPI_DEFAULT_TYPE;
301       datatype2  = MPI_DEFAULT_TYPE;
302       disp_index = 3 + comm_size;
303     } else {
304       // no disp specified, maybe only datatype,
305       datatype1 = parse_datatype(action, 3 + comm_size);
306       datatype2 = parse_datatype(action, 4 + comm_size);
307     }
308
309     if (disp_index != 0) {
310       xbt_assert(disp_index + comm_size <= action.size());
311       for (unsigned i = 0; i < comm_size; i++)
312         disps[i]          = std::stoi(action[disp_index + i]);
313     }
314   }
315
316   for (unsigned int i = 0; i < comm_size; i++) {
317     (*recvcounts)[i] = std::stoi(action[i + 3]);
318   }
319   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
320 }
321
322 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
323 {
324   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
325         0 gather 68 68 0 0 0
326       where:
327         1) 68 is the sendcounts
328         2) 68 is the recvcounts
329         3) 0 is the root node
330         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
332   */
333   comm_size = MPI_COMM_WORLD->size();
334   CHECK_ACTION_PARAMS(action, 2, 3)
335   comm_size = MPI_COMM_WORLD->size();
336   send_size = parse_integer<int>(action[2]);
337   recv_size = parse_integer<int>(action[3]);
338   root      = parse_root(action, 4);
339   datatype1 = parse_datatype(action, 5);
340   datatype2 = parse_datatype(action, 6);
341 }
342
343 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
344 {
345   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
346      0 gather 68 10 10 10 68 0 0 0
347       where:
348       1) 68 10 10 10 is the sendcounts
349       2) 68 is the recvcount
350       3) 0 is the root node
351       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
352       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
353   */
354   comm_size = MPI_COMM_WORLD->size();
355   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
356   recv_size  = parse_integer<int>(action[2 + comm_size]);
357   disps      = std::vector<int>(comm_size, 0);
358   sendcounts = std::make_shared<std::vector<int>>(comm_size);
359
360   root      = parse_root(action, 3 + comm_size);
361   datatype1 = parse_datatype(action, 4 + comm_size);
362   datatype2 = parse_datatype(action, 5 + comm_size);
363
364   for (unsigned int i = 0; i < comm_size; i++) {
365     (*sendcounts)[i] = std::stoi(action[i + 2]);
366   }
367   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
368 }
369
370 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
371 {
372   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
373        0 reducescatter 275427 275427 275427 204020 11346849 0
374      where:
375        1) The first four values after the name of the action declare the recvcounts array
376        2) The value 11346849 is the amount of instructions
377        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
378   */
379   comm_size = MPI_COMM_WORLD->size();
380   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
381   comp_size  = parse_double(action[2 + comm_size]);
382   recvcounts = std::make_shared<std::vector<int>>(comm_size);
383   datatype1  = parse_datatype(action, 3 + comm_size);
384
385   for (unsigned int i = 0; i < comm_size; i++) {
386     (*recvcounts)[i]= std::stoi(action[i + 2]);
387   }
388   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
389 }
390
391 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
392 {
393   CHECK_ACTION_PARAMS(action, 2, 1)
394   size      = parse_integer<size_t>(action[2]);
395   comp_size = parse_double(action[3]);
396   datatype1 = parse_datatype(action, 4);
397 }
398
399 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
400 {
401   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
402         0 alltoallv 100 1 7 10 12 100 1 70 10 5
403      where:
404       1) 100 is the size of the send buffer *sizeof(int),
405       2) 1 7 10 12 is the sendcounts array
406       3) 100*sizeof(int) is the size of the receiver buffer
407       4)  1 70 10 5 is the recvcounts array
408   */
409   comm_size = MPI_COMM_WORLD->size();
410   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
411   sendcounts = std::make_shared<std::vector<int>>(comm_size);
412   recvcounts = std::make_shared<std::vector<int>>(comm_size);
413   senddisps  = std::vector<int>(comm_size, 0);
414   recvdisps  = std::vector<int>(comm_size, 0);
415
416   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
417   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
418
419   send_buf_size = parse_integer<int>(action[2]);
420   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
421   for (unsigned int i = 0; i < comm_size; i++) {
422     (*sendcounts)[i] = std::stoi(action[3 + i]);
423     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
424   }
425   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
426   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
427 }
428
429 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
430 {
431   std::string s = boost::algorithm::join(action, " ");
432   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433   const WaitTestParser& args = get_args();
434   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
435
436   if (request == MPI_REQUEST_NULL) {
437     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
438      * return.*/
439     return;
440   }
441
442   // Must be taken before Request::wait() since the request may be set to
443   // MPI_REQUEST_NULL by Request::wait!
444   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
445
446   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
447
448   MPI_Status status;
449   Request::wait(&request, &status);
450   if(request!=MPI_REQUEST_NULL)
451     Request::unref(&request);
452   TRACE_smpi_comm_out(get_pid());
453   if (is_wait_for_receive)
454     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
455 }
456
457 void SendAction::kernel(simgrid::xbt::ReplayAction&)
458 {
459   const SendOrRecvParser& args = get_args();
460   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
461
462   TRACE_smpi_comm_in(
463       get_pid(), __func__,
464       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
465   if (not TRACE_smpi_view_internals())
466     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
467
468   if (get_name() == "send") {
469     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470   } else if (get_name() == "isend") {
471     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
472     req_storage.add(request);
473   } else {
474     xbt_die("Don't know this action, %s", get_name().c_str());
475   }
476
477   TRACE_smpi_comm_out(get_pid());
478 }
479
480 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
481 {
482   const SendOrRecvParser& args = get_args();
483   TRACE_smpi_comm_in(
484       get_pid(), __func__,
485       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
486
487   MPI_Status status;
488   // unknown size from the receiver point of view
489   ssize_t arg_size = args.size;
490   if (arg_size < 0) {
491     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
492     arg_size = status.count;
493   }
494
495   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
496   if (get_name() == "recv") {
497     is_recv = true;
498     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
499   } else if (get_name() == "irecv") {
500     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
501     req_storage.add(request);
502   } else {
503     THROW_IMPOSSIBLE;
504   }
505
506   TRACE_smpi_comm_out(get_pid());
507   if (is_recv && not TRACE_smpi_view_internals()) {
508     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
509     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
510   }
511 }
512
513 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
514 {
515   XBT_DEBUG("Enters SendRecv");
516   const SendRecvParser& args = get_args();
517   aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
518   aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
519   aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
520
521   MPI_Status status;
522   int sendtag=0;
523   int recvtag=0;
524
525   // FIXME: Hack the way to trace this one
526   auto dst_hack = std::make_shared<std::vector<int>>();
527   auto src_hack = std::make_shared<std::vector<int>>();
528   dst_hack->push_back(dst_traced);
529   src_hack->push_back(src_traced);
530   TRACE_smpi_comm_in(my_proc_id, __func__,
531                        new simgrid::instr::VarCollTIData(
532                            "sendRecv", -1, args.sendcount,
533                          dst_hack, args.recvcount, src_hack,
534                            simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
535
536   TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
537
538   simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
539                                      recvtag, MPI_COMM_WORLD, &status);
540
541   TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
542   TRACE_smpi_comm_out(my_proc_id);
543   XBT_DEBUG("Exits SendRecv");
544 }
545
546 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
547 {
548   const ComputeParser& args = get_args();
549   if (smpi_cfg_simulate_computation()) {
550     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
551   }
552 }
553
554 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   const SleepParser& args = get_args();
557   XBT_DEBUG("Sleep for: %lf secs", args.time);
558   aid_t pid = simgrid::s4u::this_actor::get_pid();
559   TRACE_smpi_sleeping_in(pid, args.time);
560   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
561   TRACE_smpi_sleeping_out(pid);
562 }
563
564 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
565 {
566   const LocationParser& args = get_args();
567   smpi_trace_set_call_location(args.filename.c_str(), args.line);
568 }
569
570 void TestAction::kernel(simgrid::xbt::ReplayAction&)
571 {
572   const WaitTestParser& args = get_args();
573   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
574   // if request is null here, this may mean that a previous test has succeeded
575   // Different times in traced application and replayed version may lead to this
576   // In this case, ignore the extra calls.
577   if (request != MPI_REQUEST_NULL) {
578     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
579
580     MPI_Status status;
581     int flag = 0;
582     Request::test(&request, &status, &flag);
583
584     XBT_DEBUG("MPI_Test result: %d", flag);
585     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
586      * nullptr.*/
587     if (request == MPI_REQUEST_NULL)
588       req_storage.addNullRequest(args.src, args.dst, args.tag);
589     else
590       req_storage.add(request);
591
592     TRACE_smpi_comm_out(get_pid());
593   }
594 }
595
596 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
597 {
598   CHECK_ACTION_PARAMS(action, 0, 1)
599     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
600     : MPI_BYTE;  // default TAU datatype
601
602   /* start a simulated timer */
603   smpi_process()->simulated_start();
604 }
605
606 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
607 {
608   /* nothing to do */
609 }
610
611 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
612 {
613   if (req_storage.size() > 0) {
614     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
615     std::vector<MPI_Request> reqs;
616     req_storage.get_requests(reqs);
617     unsigned long count_requests = reqs.size();
618     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
619     for (auto const& req : reqs) {
620       if (req && (req->flags() & MPI_REQ_RECV)) {
621         sender_receiver.emplace_back(req->src(), req->dst());
622       }
623     }
624     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
625     req_storage.get_store().clear();
626
627     for (MPI_Request& req : reqs)
628       if (req != MPI_REQUEST_NULL)
629         Request::unref(&req);
630
631     for (auto const& [src, dst] : sender_receiver) {
632       TRACE_smpi_recv(src, dst, 0);
633     }
634     TRACE_smpi_comm_out(get_pid());
635   }
636 }
637
638 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
639 {
640   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
641   colls::barrier(MPI_COMM_WORLD);
642   TRACE_smpi_comm_out(get_pid());
643 }
644
645 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
646 {
647   const BcastArgParser& args = get_args();
648   TRACE_smpi_comm_in(get_pid(), "action_bcast",
649                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
650                                                     0, Datatype::encode(args.datatype1), ""));
651
652   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
653
654   TRACE_smpi_comm_out(get_pid());
655 }
656
657 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
658 {
659   const ReduceArgParser& args = get_args();
660   TRACE_smpi_comm_in(get_pid(), "action_reduce",
661                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
662                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
663
664   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
665                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
666                 args.root, MPI_COMM_WORLD);
667   if (args.comp_size != 0.0)
668     simgrid::s4u::this_actor::exec_init(args.comp_size)
669       ->set_name("computation")
670       ->start()
671       ->wait();
672
673   TRACE_smpi_comm_out(get_pid());
674 }
675
676 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
677 {
678   const AllReduceArgParser& args = get_args();
679   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
680                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
681                                                     Datatype::encode(args.datatype1), ""));
682
683   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
684                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
685                    MPI_COMM_WORLD);
686   if (args.comp_size != 0.0)
687     simgrid::s4u::this_actor::exec_init(args.comp_size)
688       ->set_name("computation")
689       ->start()
690       ->wait();
691
692   TRACE_smpi_comm_out(get_pid());
693 }
694
695 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
696 {
697   const AllToAllArgParser& args = get_args();
698   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
699                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
700                                                     Datatype::encode(args.datatype1),
701                                                     Datatype::encode(args.datatype2)));
702
703   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
704                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
705                   MPI_COMM_WORLD);
706
707   TRACE_smpi_comm_out(get_pid());
708 }
709
710 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
711 {
712   const GatherArgParser& args = get_args();
713   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
714                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
715                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
716                                                     Datatype::encode(args.datatype2)));
717
718   if (get_name() == "gather") {
719     int rank = MPI_COMM_WORLD->rank();
720     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
722                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
723   } else
724     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
725                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
726                      MPI_COMM_WORLD);
727
728   TRACE_smpi_comm_out(get_pid());
729 }
730
731 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
732 {
733   int rank = MPI_COMM_WORLD->rank();
734   const GatherVArgParser& args = get_args();
735   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
736                      new simgrid::instr::VarCollTIData(
737                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
738                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
739
740   if (get_name() == "gatherv") {
741     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
742                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
743                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
744   } else {
745     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
746                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
747                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
748   }
749
750   TRACE_smpi_comm_out(get_pid());
751 }
752
753 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
754 {
755   int rank = MPI_COMM_WORLD->rank();
756   const ScatterArgParser& args = get_args();
757   TRACE_smpi_comm_in(get_pid(), "action_scatter",
758                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
759                                                     Datatype::encode(args.datatype1),
760                                                     Datatype::encode(args.datatype2)));
761
762   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
763                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
764                  args.datatype2, args.root, MPI_COMM_WORLD);
765
766   TRACE_smpi_comm_out(get_pid());
767 }
768
769 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
770 {
771   int rank = MPI_COMM_WORLD->rank();
772   const ScatterVArgParser& args = get_args();
773   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
774                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
775                                                        nullptr, Datatype::encode(args.datatype1),
776                                                        Datatype::encode(args.datatype2)));
777
778   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
779                   args.sendcounts->data(), args.disps.data(), args.datatype1,
780                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
781                   MPI_COMM_WORLD);
782
783   TRACE_smpi_comm_out(get_pid());
784 }
785
786 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
787 {
788   const ReduceScatterArgParser& args = get_args();
789   TRACE_smpi_comm_in(
790       get_pid(), "action_reducescatter",
791       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
792                                         /* ugly as we use datatype field to pass computation as string */
793                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
794                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
795                                         Datatype::encode(args.datatype1)));
796
797   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
798                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
799                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
800   if (args.comp_size != 0.0)
801     simgrid::s4u::this_actor::exec_init(args.comp_size)
802       ->set_name("computation")
803       ->start()
804       ->wait();
805   TRACE_smpi_comm_out(get_pid());
806 }
807
808 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
809 {
810   const ScanArgParser& args = get_args();
811   TRACE_smpi_comm_in(get_pid(), "action_scan",
812                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
813                      args.size, 0, Datatype::encode(args.datatype1), ""));
814   if (get_name() == "scan")
815     colls::scan(send_buffer(args.size * args.datatype1->size()),
816               recv_buffer(args.size * args.datatype1->size()), args.size,
817               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
818   else
819     colls::exscan(send_buffer(args.size * args.datatype1->size()),
820               recv_buffer(args.size * args.datatype1->size()), args.size,
821               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
822
823   if (args.comp_size != 0.0)
824     simgrid::s4u::this_actor::exec_init(args.comp_size)
825       ->set_name("computation")
826       ->start()
827       ->wait();
828   TRACE_smpi_comm_out(get_pid());
829 }
830
831 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
832 {
833   const AllToAllVArgParser& args = get_args();
834   TRACE_smpi_comm_in(get_pid(), __func__,
835                      new simgrid::instr::VarCollTIData(
836                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
837                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
838
839   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
840                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
841                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
842
843   TRACE_smpi_comm_out(get_pid());
844 }
845 } // namespace simgrid::smpi::replay
846
847 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
848 /** @brief Only initialize the replay, don't do it for real */
849 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
850 {
851   xbt_assert(not smpi_process()->initializing());
852
853   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
854   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
855   simgrid::smpi::ActorExt::init();
856
857   smpi_process()->mark_as_initialized();
858   smpi_process()->set_replaying(true);
859
860   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
861   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
862   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
863   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
864   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
865   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
866   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
867   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
868   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
869   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
870   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
871   xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
872   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
873   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
875   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
876   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
877   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
878   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
879   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
880   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
881   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
882   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
883   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
884   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
885   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
886   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
887   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
888   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
889   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
890   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
891   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
892
893   //if we have a delayed start, sleep here.
894   if (start_delay_flops > 0) {
895     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
896     private_execute_flops(start_delay_flops);
897   } else {
898     // Wait for the other actors to initialize also
899     simgrid::s4u::this_actor::yield();
900   }
901   if(_smpi_init_sleep > 0)
902     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
903 }
904
905 /** @brief actually run the replay after initialization */
906 void smpi_replay_main(int rank, const char* private_trace_filename)
907 {
908   static int active_processes = 0;
909   active_processes++;
910   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
911   std::string rank_string                      = std::to_string(rank);
912   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
913
914   /* and now, finalize everything */
915   /* One active process will stop. Decrease the counter*/
916   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
917   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
918   if (count_requests > 0) {
919     std::vector<MPI_Request> requests(count_requests);
920     unsigned int i=0;
921
922     for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
923       for (auto& req : reqs) {
924         requests[i] = req; // FIXME: overwritten at each iteration?
925       }
926       i++;
927     }
928     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
929   }
930
931   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
932     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
933
934   active_processes--;
935
936   if(active_processes==0){
937     /* Last process alive speaking: end the simulated timer */
938     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
939     smpi_free_replay_tmp_buffers();
940   }
941
942   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
943                      new simgrid::instr::NoOpTIData("finalize"));
944
945   smpi_process()->finalize();
946
947   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
948 }
949
950 /** @brief chain a replay initialization and a replay start */
951 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
952 {
953   smpi_replay_init(instance_id, rank, start_delay_flops);
954   smpi_replay_main(rank, private_trace_filename);
955 }