Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Reduce dependencies on <simgrid/version.h>.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid {
101 namespace smpi {
102
103 namespace replay {
104 MPI_Datatype MPI_DEFAULT_TYPE;
105
106 class RequestStorage {
107 private:
108   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
109   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
110
111   req_storage_t store;
112
113 public:
114   RequestStorage() = default;
115   size_t size() const { return store.size(); }
116
117   req_storage_t& get_store() { return store; }
118
119   void get_requests(std::vector<MPI_Request>& vec) const
120   {
121     for (auto const& pair : store) {
122       auto& reqs       = pair.second;
123       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
124       for (auto& req: reqs){
125         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
126           vec.push_back(req);
127           req->print_request("MM");
128         }
129       }
130     }
131   }
132
133   MPI_Request pop(int src, int dst, int tag)
134   {
135     auto it = store.find(req_key_t(src, dst, tag));
136     if (it == store.end())
137      return MPI_REQUEST_NULL;
138     MPI_Request req = it->second.front();
139     it->second.pop_front();
140     if(it->second.empty())
141       store.erase(req_key_t(src, dst, tag));
142     return req;
143   }
144
145   void add(MPI_Request req)
146   {
147     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
148       auto it = store.find(req_key_t(req->src()-1, req->dst()-1, req->tag()));
149       if (it == store.end())
150         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), std::list<MPI_Request>()});
151       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
152     }
153   }
154
155   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
156   void addNullRequest(int src, int dst, int tag)
157   {
158     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
159     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
160     auto it = store.find(req_key_t(src_pid, dest_pid, tag));
161     if (it == store.end())
162       store.insert({req_key_t(src_pid, dest_pid, tag), std::list<MPI_Request>()});
163     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
164   }
165 };
166
167 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 {
169   CHECK_ACTION_PARAMS(action, 3, 0)
170   src = std::stoi(action[2]);
171   dst = std::stoi(action[3]);
172   tag = std::stoi(action[4]);
173 }
174
175 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 {
177   CHECK_ACTION_PARAMS(action, 3, 1)
178   partner = std::stoi(action[2]);
179   tag     = std::stoi(action[3]);
180   size      = parse_integer<size_t>(action[4]);
181   datatype1 = parse_datatype(action, 5);
182 }
183
184 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 {
186   CHECK_ACTION_PARAMS(action, 1, 0)
187   flops = parse_double(action[2]);
188 }
189
190 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 1, 0)
193   time = parse_double(action[2]);
194 }
195
196 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
197 {
198   CHECK_ACTION_PARAMS(action, 2, 0)
199   filename = std::string(action[2]);
200   line = std::stoi(action[3]);
201 }
202
203 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 {
205   CHECK_ACTION_PARAMS(action, 1, 2)
206   size      = parse_integer<size_t>(action[2]);
207   root      = parse_root(action, 3);
208   datatype1 = parse_datatype(action, 4);
209 }
210
211 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 2)
214   comm_size = parse_integer<unsigned>(action[2]);
215   comp_size = parse_double(action[3]);
216   root      = parse_root(action, 4);
217   datatype1 = parse_datatype(action, 5);
218 }
219
220 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
221 {
222   CHECK_ACTION_PARAMS(action, 2, 1)
223   comm_size = parse_integer<unsigned>(action[2]);
224   comp_size = parse_double(action[3]);
225   datatype1 = parse_datatype(action, 4);
226 }
227
228 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
229 {
230   CHECK_ACTION_PARAMS(action, 2, 1)
231   comm_size = MPI_COMM_WORLD->size();
232   send_size = parse_integer<int>(action[2]);
233   recv_size = parse_integer<int>(action[3]);
234   datatype1 = parse_datatype(action, 4);
235   datatype2 = parse_datatype(action, 5);
236 }
237
238 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
239 {
240   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
241         0 gather 68 68 0 0 0
242       where:
243         1) 68 is the sendcounts
244         2) 68 is the recvcounts
245         3) 0 is the root node
246         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
248   */
249   CHECK_ACTION_PARAMS(action, 2, 3)
250   comm_size = MPI_COMM_WORLD->size();
251   send_size = parse_integer<int>(action[2]);
252   recv_size = parse_integer<int>(action[3]);
253
254   if (name == "gather") {
255     root      = parse_root(action, 4);
256     datatype1 = parse_datatype(action, 5);
257     datatype2 = parse_datatype(action, 6);
258   } else {
259     root      = 0;
260     datatype1 = parse_datatype(action, 4);
261     datatype2 = parse_datatype(action, 5);
262   }
263 }
264
265 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
266 {
267   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
268        0 gather 68 68 10 10 10 0 0 0
269      where:
270        1) 68 is the sendcount
271        2) 68 10 10 10 is the recvcounts
272        3) 0 is the root node
273        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
274        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
275   */
276   comm_size = MPI_COMM_WORLD->size();
277   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
278   send_size  = parse_integer<int>(action[2]);
279   disps      = std::vector<int>(comm_size, 0);
280   recvcounts = std::make_shared<std::vector<int>>(comm_size);
281
282   if (name == "gatherv") {
283     root      = parse_root(action, 3 + comm_size);
284     datatype1 = parse_datatype(action, 4 + comm_size);
285     datatype2 = parse_datatype(action, 5 + comm_size);
286   } else {
287     root                = 0;
288     unsigned disp_index = 0;
289     /* The 3 comes from "0 gather <sendcount>", which must always be present.
290      * The + comm_size is the recvcounts array, which must also be present
291      */
292     if (action.size() > 3 + comm_size + comm_size) {
293       // datatype + disp are specified
294       datatype1  = parse_datatype(action, 3 + comm_size);
295       datatype2  = parse_datatype(action, 4 + comm_size);
296       disp_index = 5 + comm_size;
297     } else if (action.size() > 3 + comm_size + 2) {
298       // disps specified; datatype is not specified; use the default one
299       datatype1  = MPI_DEFAULT_TYPE;
300       datatype2  = MPI_DEFAULT_TYPE;
301       disp_index = 3 + comm_size;
302     } else {
303       // no disp specified, maybe only datatype,
304       datatype1 = parse_datatype(action, 3 + comm_size);
305       datatype2 = parse_datatype(action, 4 + comm_size);
306     }
307
308     if (disp_index != 0) {
309       xbt_assert(disp_index + comm_size <= action.size());
310       for (unsigned i = 0; i < comm_size; i++)
311         disps[i]          = std::stoi(action[disp_index + i]);
312     }
313   }
314
315   for (unsigned int i = 0; i < comm_size; i++) {
316     (*recvcounts)[i] = std::stoi(action[i + 3]);
317   }
318   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
319 }
320
321 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
322 {
323   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
324         0 gather 68 68 0 0 0
325       where:
326         1) 68 is the sendcounts
327         2) 68 is the recvcounts
328         3) 0 is the root node
329         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
330         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
331   */
332   comm_size = MPI_COMM_WORLD->size();
333   CHECK_ACTION_PARAMS(action, 2, 3)
334   comm_size = MPI_COMM_WORLD->size();
335   send_size = parse_integer<int>(action[2]);
336   recv_size = parse_integer<int>(action[3]);
337   root      = parse_root(action, 4);
338   datatype1 = parse_datatype(action, 5);
339   datatype2 = parse_datatype(action, 6);
340 }
341
342 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
343 {
344   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
345      0 gather 68 10 10 10 68 0 0 0
346       where:
347       1) 68 10 10 10 is the sendcounts
348       2) 68 is the recvcount
349       3) 0 is the root node
350       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
351       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
352   */
353   comm_size = MPI_COMM_WORLD->size();
354   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
355   recv_size  = parse_integer<int>(action[2 + comm_size]);
356   disps      = std::vector<int>(comm_size, 0);
357   sendcounts = std::make_shared<std::vector<int>>(comm_size);
358
359   root      = parse_root(action, 3 + comm_size);
360   datatype1 = parse_datatype(action, 4 + comm_size);
361   datatype2 = parse_datatype(action, 5 + comm_size);
362
363   for (unsigned int i = 0; i < comm_size; i++) {
364     (*sendcounts)[i] = std::stoi(action[i + 2]);
365   }
366   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
367 }
368
369 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
370 {
371   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
372        0 reducescatter 275427 275427 275427 204020 11346849 0
373      where:
374        1) The first four values after the name of the action declare the recvcounts array
375        2) The value 11346849 is the amount of instructions
376        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
377   */
378   comm_size = MPI_COMM_WORLD->size();
379   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
380   comp_size  = parse_double(action[2 + comm_size]);
381   recvcounts = std::make_shared<std::vector<int>>(comm_size);
382   datatype1  = parse_datatype(action, 3 + comm_size);
383
384   for (unsigned int i = 0; i < comm_size; i++) {
385     (*recvcounts)[i]= std::stoi(action[i + 2]);
386   }
387   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
388 }
389
390 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
391 {
392   CHECK_ACTION_PARAMS(action, 2, 1)
393   size      = parse_integer<size_t>(action[2]);
394   comp_size = parse_double(action[3]);
395   datatype1 = parse_datatype(action, 4);
396 }
397
398 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
399 {
400   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
401         0 alltoallv 100 1 7 10 12 100 1 70 10 5
402      where:
403       1) 100 is the size of the send buffer *sizeof(int),
404       2) 1 7 10 12 is the sendcounts array
405       3) 100*sizeof(int) is the size of the receiver buffer
406       4)  1 70 10 5 is the recvcounts array
407   */
408   comm_size = MPI_COMM_WORLD->size();
409   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
410   sendcounts = std::make_shared<std::vector<int>>(comm_size);
411   recvcounts = std::make_shared<std::vector<int>>(comm_size);
412   senddisps  = std::vector<int>(comm_size, 0);
413   recvdisps  = std::vector<int>(comm_size, 0);
414
415   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
416   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
417
418   send_buf_size = parse_integer<int>(action[2]);
419   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
420   for (unsigned int i = 0; i < comm_size; i++) {
421     (*sendcounts)[i] = std::stoi(action[3 + i]);
422     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
423   }
424   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
425   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
426 }
427
428 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
429 {
430   std::string s = boost::algorithm::join(action, " ");
431   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
432   const WaitTestParser& args = get_args();
433   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
434
435   if (request == MPI_REQUEST_NULL) {
436     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
437      * return.*/
438     return;
439   }
440
441   // Must be taken before Request::wait() since the request may be set to
442   // MPI_REQUEST_NULL by Request::wait!
443   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
444
445   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
446
447   MPI_Status status;
448   Request::wait(&request, &status);
449   if(request!=MPI_REQUEST_NULL)
450     Request::unref(&request);
451   TRACE_smpi_comm_out(get_pid());
452   if (is_wait_for_receive)
453     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
454 }
455
456 void SendAction::kernel(simgrid::xbt::ReplayAction&)
457 {
458   const SendRecvParser& args = get_args();
459   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
460
461   TRACE_smpi_comm_in(
462       get_pid(), __func__,
463       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
464   if (not TRACE_smpi_view_internals())
465     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
466
467   if (get_name() == "send") {
468     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
469   } else if (get_name() == "isend") {
470     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
471     req_storage.add(request);
472   } else {
473     xbt_die("Don't know this action, %s", get_name().c_str());
474   }
475
476   TRACE_smpi_comm_out(get_pid());
477 }
478
479 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
480 {
481   const SendRecvParser& args = get_args();
482   TRACE_smpi_comm_in(
483       get_pid(), __func__,
484       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
485
486   MPI_Status status;
487   // unknown size from the receiver point of view
488   size_t arg_size = args.size;
489   if (arg_size == 0) {
490     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
491     arg_size = status.count;
492   }
493
494   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
495   if (get_name() == "recv") {
496     is_recv = true;
497     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
498   } else if (get_name() == "irecv") {
499     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
500     req_storage.add(request);
501   } else {
502     THROW_IMPOSSIBLE;
503   }
504
505   TRACE_smpi_comm_out(get_pid());
506   if (is_recv && not TRACE_smpi_view_internals()) {
507     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
508     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
509   }
510 }
511
512 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
513 {
514   const ComputeParser& args = get_args();
515   if (smpi_cfg_simulate_computation()) {
516     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
517   }
518 }
519
520 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
521 {
522   const SleepParser& args = get_args();
523   XBT_DEBUG("Sleep for: %lf secs", args.time);
524   aid_t pid = simgrid::s4u::this_actor::get_pid();
525   TRACE_smpi_sleeping_in(pid, args.time);
526   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
527   TRACE_smpi_sleeping_out(pid);
528 }
529
530 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
531 {
532   const LocationParser& args = get_args();
533   smpi_trace_set_call_location(args.filename.c_str(), args.line);
534 }
535
536 void TestAction::kernel(simgrid::xbt::ReplayAction&)
537 {
538   const WaitTestParser& args = get_args();
539   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
540   // if request is null here, this may mean that a previous test has succeeded
541   // Different times in traced application and replayed version may lead to this
542   // In this case, ignore the extra calls.
543   if (request != MPI_REQUEST_NULL) {
544     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
545
546     MPI_Status status;
547     int flag = 0;
548     Request::test(&request, &status, &flag);
549
550     XBT_DEBUG("MPI_Test result: %d", flag);
551     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
552      * nullptr.*/
553     if (request == MPI_REQUEST_NULL)
554       req_storage.addNullRequest(args.src, args.dst, args.tag);
555     else
556       req_storage.add(request);
557
558     TRACE_smpi_comm_out(get_pid());
559   }
560 }
561
562 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
563 {
564   CHECK_ACTION_PARAMS(action, 0, 1)
565     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
566     : MPI_BYTE;  // default TAU datatype
567
568   /* start a simulated timer */
569   smpi_process()->simulated_start();
570 }
571
572 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
573 {
574   /* nothing to do */
575 }
576
577 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
578 {
579   if (req_storage.size() > 0) {
580     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
581     std::vector<MPI_Request> reqs;
582     req_storage.get_requests(reqs);
583     unsigned long count_requests = reqs.size();
584     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
585     for (auto const& req : reqs) {
586       if (req && (req->flags() & MPI_REQ_RECV)) {
587         sender_receiver.emplace_back(req->src(), req->dst());
588       }
589     }
590     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
591     req_storage.get_store().clear();
592
593     for (MPI_Request& req : reqs)
594       if (req != MPI_REQUEST_NULL)
595         Request::unref(&req);
596
597     for (auto const& pair : sender_receiver) {
598       TRACE_smpi_recv(pair.first, pair.second, 0);
599     }
600     TRACE_smpi_comm_out(get_pid());
601   }
602 }
603
604 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
605 {
606   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
607   colls::barrier(MPI_COMM_WORLD);
608   TRACE_smpi_comm_out(get_pid());
609 }
610
611 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
612 {
613   const BcastArgParser& args = get_args();
614   TRACE_smpi_comm_in(get_pid(), "action_bcast",
615                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
616                                                     0, Datatype::encode(args.datatype1), ""));
617
618   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
619
620   TRACE_smpi_comm_out(get_pid());
621 }
622
623 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
624 {
625   const ReduceArgParser& args = get_args();
626   TRACE_smpi_comm_in(get_pid(), "action_reduce",
627                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
628                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
629
630   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
631                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
632                 args.root, MPI_COMM_WORLD);
633   if (args.comp_size != 0.0)
634     simgrid::s4u::this_actor::exec_init(args.comp_size)
635       ->set_name("computation")
636       ->start()
637       ->wait();
638
639   TRACE_smpi_comm_out(get_pid());
640 }
641
642 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
643 {
644   const AllReduceArgParser& args = get_args();
645   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
646                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
647                                                     Datatype::encode(args.datatype1), ""));
648
649   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
650                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
651                    MPI_COMM_WORLD);
652   if (args.comp_size != 0.0)
653     simgrid::s4u::this_actor::exec_init(args.comp_size)
654       ->set_name("computation")
655       ->start()
656       ->wait();
657
658   TRACE_smpi_comm_out(get_pid());
659 }
660
661 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
662 {
663   const AllToAllArgParser& args = get_args();
664   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
665                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
666                                                     Datatype::encode(args.datatype1),
667                                                     Datatype::encode(args.datatype2)));
668
669   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
670                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
671                   MPI_COMM_WORLD);
672
673   TRACE_smpi_comm_out(get_pid());
674 }
675
676 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
677 {
678   const GatherArgParser& args = get_args();
679   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
680                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
681                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
682                                                     Datatype::encode(args.datatype2)));
683
684   if (get_name() == "gather") {
685     int rank = MPI_COMM_WORLD->rank();
686     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
687                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
688                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
689   } else
690     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
692                      MPI_COMM_WORLD);
693
694   TRACE_smpi_comm_out(get_pid());
695 }
696
697 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
698 {
699   int rank = MPI_COMM_WORLD->rank();
700   const GatherVArgParser& args = get_args();
701   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
702                      new simgrid::instr::VarCollTIData(
703                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
704                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
705
706   if (get_name() == "gatherv") {
707     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
708                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
709                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
710   } else {
711     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
712                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
713                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
714   }
715
716   TRACE_smpi_comm_out(get_pid());
717 }
718
719 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
720 {
721   int rank = MPI_COMM_WORLD->rank();
722   const ScatterArgParser& args = get_args();
723   TRACE_smpi_comm_in(get_pid(), "action_scatter",
724                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
725                                                     Datatype::encode(args.datatype1),
726                                                     Datatype::encode(args.datatype2)));
727
728   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
729                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
730                  args.datatype2, args.root, MPI_COMM_WORLD);
731
732   TRACE_smpi_comm_out(get_pid());
733 }
734
735 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
736 {
737   int rank = MPI_COMM_WORLD->rank();
738   const ScatterVArgParser& args = get_args();
739   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
740                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
741                                                        nullptr, Datatype::encode(args.datatype1),
742                                                        Datatype::encode(args.datatype2)));
743
744   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
745                   args.sendcounts->data(), args.disps.data(), args.datatype1,
746                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
747                   MPI_COMM_WORLD);
748
749   TRACE_smpi_comm_out(get_pid());
750 }
751
752 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
753 {
754   const ReduceScatterArgParser& args = get_args();
755   TRACE_smpi_comm_in(
756       get_pid(), "action_reducescatter",
757       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
758                                         /* ugly as we use datatype field to pass computation as string */
759                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
760                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
761                                         Datatype::encode(args.datatype1)));
762
763   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
764                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
765                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
766   if (args.comp_size != 0.0)
767     simgrid::s4u::this_actor::exec_init(args.comp_size)
768       ->set_name("computation")
769       ->start()
770       ->wait();
771   TRACE_smpi_comm_out(get_pid());
772 }
773
774 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
775 {
776   const ScanArgParser& args = get_args();
777   TRACE_smpi_comm_in(get_pid(), "action_scan",
778                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
779                      args.size, 0, Datatype::encode(args.datatype1), ""));
780   if (get_name() == "scan")
781     colls::scan(send_buffer(args.size * args.datatype1->size()),
782               recv_buffer(args.size * args.datatype1->size()), args.size,
783               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
784   else
785     colls::exscan(send_buffer(args.size * args.datatype1->size()),
786               recv_buffer(args.size * args.datatype1->size()), args.size,
787               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
788
789   if (args.comp_size != 0.0)
790     simgrid::s4u::this_actor::exec_init(args.comp_size)
791       ->set_name("computation")
792       ->start()
793       ->wait();
794   TRACE_smpi_comm_out(get_pid());
795 }
796
797 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
798 {
799   const AllToAllVArgParser& args = get_args();
800   TRACE_smpi_comm_in(get_pid(), __func__,
801                      new simgrid::instr::VarCollTIData(
802                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
803                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
804
805   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
806                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
807                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
808
809   TRACE_smpi_comm_out(get_pid());
810 }
811 } // Replay Namespace
812 }} // namespace simgrid::smpi
813
814 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
815 /** @brief Only initialize the replay, don't do it for real */
816 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
817 {
818   xbt_assert(not smpi_process()->initializing());
819
820   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
821   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
822   simgrid::smpi::ActorExt::init();
823
824   smpi_process()->mark_as_initialized();
825   smpi_process()->set_replaying(true);
826
827   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
828   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
829   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
830   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
831   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
832   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
833   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
835   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
836   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
837   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
838   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
839   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
840   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
841   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
842   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
843   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
844   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
845   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
846   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
847   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
848   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
849   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
850   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
851   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
852   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
853   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
854   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
855   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
856   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
857   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
858
859   //if we have a delayed start, sleep here.
860   if (start_delay_flops > 0) {
861     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
862     private_execute_flops(start_delay_flops);
863   } else {
864     // Wait for the other actors to initialize also
865     simgrid::s4u::this_actor::yield();
866   }
867   if(_smpi_init_sleep > 0)
868     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
869 }
870
871 /** @brief actually run the replay after initialization */
872 void smpi_replay_main(int rank, const char* private_trace_filename)
873 {
874   static int active_processes = 0;
875   active_processes++;
876   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
877   std::string rank_string                      = std::to_string(rank);
878   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
879
880   /* and now, finalize everything */
881   /* One active process will stop. Decrease the counter*/
882   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
883   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
884   if (count_requests > 0) {
885     std::vector<MPI_Request> requests(count_requests);
886     unsigned int i=0;
887
888     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
889       for (auto& req: pair.second){
890         requests[i] = req;
891       }
892       i++;
893     }
894     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
895   }
896
897   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
898     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
899
900   active_processes--;
901
902   if(active_processes==0){
903     /* Last process alive speaking: end the simulated timer */
904     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
905     smpi_free_replay_tmp_buffers();
906   }
907
908   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
909                      new simgrid::instr::NoOpTIData("finalize"));
910
911   smpi_process()->finalize();
912
913   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
914 }
915
916 /** @brief chain a replay initialization and a replay start */
917 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
918 {
919   smpi_replay_init(instance_id, rank, start_delay_flops);
920   smpi_replay_main(rank, private_trace_filename);
921 }