Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill smpi::Group::actor(int).
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
65 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92   RequestStorage() = default;
93   int size() const { return store.size(); }
94
95   req_storage_t& get_store() { return store; }
96
97   void get_requests(std::vector<MPI_Request>& vec) const
98   {
99     for (auto const& pair : store) {
100       auto& req       = pair.second;
101       auto my_proc_id = simgrid::s4u::this_actor::get_pid();
102       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103         vec.push_back(pair.second);
104         pair.second->print_request("MM");
105       }
106     }
107   }
108
109     MPI_Request find(int src, int dst, int tag)
110     {
111       auto it = store.find(req_key_t(src, dst, tag));
112       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
113     }
114
115     void remove(const Request* req)
116     {
117       if (req == MPI_REQUEST_NULL) return;
118
119       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
120     }
121
122     void add(MPI_Request req)
123     {
124       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
126     }
127
128     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129     void addNullRequest(int src, int dst, int tag)
130     {
131       store.insert(
132           {req_key_t(MPI_COMM_WORLD->group()->actor_pid(src) - 1, MPI_COMM_WORLD->group()->actor_pid(dst) - 1, tag),
133            MPI_REQUEST_NULL});
134     }
135 };
136
137 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
138 {
139   CHECK_ACTION_PARAMS(action, 3, 0)
140   src = std::stoi(action[2]);
141   dst = std::stoi(action[3]);
142   tag = std::stoi(action[4]);
143 }
144
145 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 {
147   CHECK_ACTION_PARAMS(action, 3, 1)
148   partner = std::stoi(action[2]);
149   tag     = std::stoi(action[3]);
150   size    = parse_double(action[4]);
151   if (action.size() > 5)
152     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
153 }
154
155 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
156 {
157   CHECK_ACTION_PARAMS(action, 1, 0)
158   flops = parse_double(action[2]);
159 }
160
161 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
162 {
163   CHECK_ACTION_PARAMS(action, 1, 0)
164   time = parse_double(action[2]);
165 }
166
167 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 {
169   CHECK_ACTION_PARAMS(action, 2, 0)
170   filename = std::string(action[2]);
171   line = std::stoi(action[3]);
172 }
173
174 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 2)
177   size = parse_double(action[2]);
178   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
179   if (action.size() > 4)
180     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
181 }
182
183 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 {
185   CHECK_ACTION_PARAMS(action, 2, 2)
186   comm_size = parse_double(action[2]);
187   comp_size = parse_double(action[3]);
188   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
189   if (action.size() > 5)
190     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
191 }
192
193 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 {
195   CHECK_ACTION_PARAMS(action, 2, 1)
196   comm_size = parse_double(action[2]);
197   comp_size = parse_double(action[3]);
198   if (action.size() > 4)
199     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
200 }
201
202 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 {
204   CHECK_ACTION_PARAMS(action, 2, 1)
205   comm_size = MPI_COMM_WORLD->size();
206   send_size = parse_double(action[2]);
207   recv_size = parse_double(action[3]);
208
209   if (action.size() > 4)
210     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
211   if (action.size() > 5)
212     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
213 }
214
215 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
216 {
217   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
218         0 gather 68 68 0 0 0
219       where:
220         1) 68 is the sendcounts
221         2) 68 is the recvcounts
222         3) 0 is the root node
223         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
224         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
225   */
226   CHECK_ACTION_PARAMS(action, 2, 3)
227   comm_size = MPI_COMM_WORLD->size();
228   send_size = parse_double(action[2]);
229   recv_size = parse_double(action[3]);
230
231   if (name == "gather") {
232     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
233     if (action.size() > 5)
234       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
235     if (action.size() > 6)
236       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
237   } else {
238     if (action.size() > 4)
239       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
240     if (action.size() > 5)
241       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
242   }
243 }
244
245 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
246 {
247   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
248        0 gather 68 68 10 10 10 0 0 0
249      where:
250        1) 68 is the sendcount
251        2) 68 10 10 10 is the recvcounts
252        3) 0 is the root node
253        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
254        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
255   */
256   comm_size = MPI_COMM_WORLD->size();
257   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
258   send_size  = parse_double(action[2]);
259   disps      = std::vector<int>(comm_size, 0);
260   recvcounts = std::make_shared<std::vector<int>>(comm_size);
261
262   if (name == "gatherv") {
263     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
264     if (action.size() > 4 + comm_size)
265       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
266     if (action.size() > 5 + comm_size)
267       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
268   } else {
269     int disp_index     = 0;
270     /* The 3 comes from "0 gather <sendcount>", which must always be present.
271      * The + comm_size is the recvcounts array, which must also be present
272      */
273     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
274       int datatype_index = 3 + comm_size;
275       disp_index     = datatype_index + 1;
276       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
277       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
278     } else if (action.size() >
279                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
280       disp_index = 3 + comm_size;
281     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
282       int datatype_index = 3 + comm_size;
283       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
284       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285     }
286
287     if (disp_index != 0) {
288       for (unsigned int i = 0; i < comm_size; i++)
289         disps[i]          = std::stoi(action[disp_index + i]);
290     }
291   }
292
293   for (unsigned int i = 0; i < comm_size; i++) {
294     (*recvcounts)[i] = std::stoi(action[i + 3]);
295   }
296   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
297 }
298
299 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
300 {
301   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
302         0 gather 68 68 0 0 0
303       where:
304         1) 68 is the sendcounts
305         2) 68 is the recvcounts
306         3) 0 is the root node
307         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
308         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
309   */
310   CHECK_ACTION_PARAMS(action, 2, 3)
311   comm_size = MPI_COMM_WORLD->size();
312   send_size = parse_double(action[2]);
313   recv_size = parse_double(action[3]);
314   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
315   if (action.size() > 5)
316     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
317   if (action.size() > 6)
318     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
319 }
320
321 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
322 {
323   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
324      0 gather 68 10 10 10 68 0 0 0
325       where:
326       1) 68 10 10 10 is the sendcounts
327       2) 68 is the recvcount
328       3) 0 is the root node
329       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
330       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
331   */
332   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
333   recv_size  = parse_double(action[2 + comm_size]);
334   disps      = std::vector<int>(comm_size, 0);
335   sendcounts = std::make_shared<std::vector<int>>(comm_size);
336
337   if (action.size() > 5 + comm_size)
338     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
339   if (action.size() > 5 + comm_size)
340     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
341
342   for (unsigned int i = 0; i < comm_size; i++) {
343     (*sendcounts)[i] = std::stoi(action[i + 2]);
344   }
345   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
346   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
347 }
348
349 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
350 {
351   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
352        0 reducescatter 275427 275427 275427 204020 11346849 0
353      where:
354        1) The first four values after the name of the action declare the recvcounts array
355        2) The value 11346849 is the amount of instructions
356        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
357   */
358   comm_size = MPI_COMM_WORLD->size();
359   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
360   comp_size  = parse_double(action[2 + comm_size]);
361   recvcounts = std::make_shared<std::vector<int>>(comm_size);
362   if (action.size() > 3 + comm_size)
363     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
364
365   for (unsigned int i = 0; i < comm_size; i++) {
366     recvcounts->push_back(std::stoi(action[i + 2]));
367   }
368   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
369 }
370
371 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
372 {
373   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
374         0 alltoallv 100 1 7 10 12 100 1 70 10 5
375      where:
376       1) 100 is the size of the send buffer *sizeof(int),
377       2) 1 7 10 12 is the sendcounts array
378       3) 100*sizeof(int) is the size of the receiver buffer
379       4)  1 70 10 5 is the recvcounts array
380   */
381   comm_size = MPI_COMM_WORLD->size();
382   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
383   sendcounts = std::make_shared<std::vector<int>>(comm_size);
384   recvcounts = std::make_shared<std::vector<int>>(comm_size);
385   senddisps  = std::vector<int>(comm_size, 0);
386   recvdisps  = std::vector<int>(comm_size, 0);
387
388   if (action.size() > 5 + 2 * comm_size)
389     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
390   if (action.size() > 5 + 2 * comm_size)
391     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
392
393   send_buf_size = parse_double(action[2]);
394   recv_buf_size = parse_double(action[3 + comm_size]);
395   for (unsigned int i = 0; i < comm_size; i++) {
396     (*sendcounts)[i] = std::stoi(action[3 + i]);
397     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
398   }
399   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
400   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
401 }
402
403 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
404 {
405   std::string s = boost::algorithm::join(action, " ");
406   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
407   const WaitTestParser& args = get_args();
408   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
409   req_storage.remove(request);
410
411   if (request == MPI_REQUEST_NULL) {
412     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
413      * return.*/
414     return;
415   }
416
417   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
418
419   // Must be taken before Request::wait() since the request may be set to
420   // MPI_REQUEST_NULL by Request::wait!
421   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
422   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
423   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
424
425   MPI_Status status;
426   Request::wait(&request, &status);
427
428   TRACE_smpi_comm_out(rank);
429   if (is_wait_for_receive)
430     TRACE_smpi_recv(args.src, args.dst, args.tag);
431 }
432
433 void SendAction::kernel(simgrid::xbt::ReplayAction&)
434 {
435   const SendRecvParser& args = get_args();
436   int dst_traced             = MPI_COMM_WORLD->group()->actor_pid(args.partner);
437
438   TRACE_smpi_comm_in(
439       get_pid(), __func__,
440       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
441   if (not TRACE_smpi_view_internals())
442     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
443
444   if (get_name() == "send") {
445     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
446   } else if (get_name() == "isend") {
447     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448     req_storage.add(request);
449   } else {
450     xbt_die("Don't know this action, %s", get_name().c_str());
451   }
452
453   TRACE_smpi_comm_out(get_pid());
454 }
455
456 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
457 {
458   const SendRecvParser& args = get_args();
459   TRACE_smpi_comm_in(
460       get_pid(), __func__,
461       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
462
463   MPI_Status status;
464   // unknown size from the receiver point of view
465   double arg_size = args.size;
466   if (arg_size <= 0.0) {
467     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
468     arg_size = status.count;
469   }
470
471   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
472   if (get_name() == "recv") {
473     is_recv = true;
474     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
475   } else if (get_name() == "irecv") {
476     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
477     req_storage.add(request);
478   } else {
479     THROW_IMPOSSIBLE;
480   }
481
482   TRACE_smpi_comm_out(get_pid());
483   if (is_recv && not TRACE_smpi_view_internals()) {
484     int src_traced = MPI_COMM_WORLD->group()->actor_pid(status.MPI_SOURCE);
485     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
486   }
487 }
488
489 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
490 {
491   const ComputeParser& args = get_args();
492   if (smpi_cfg_simulate_computation()) {
493     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
494   }
495 }
496
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 {
499   const SleepParser& args = get_args();
500   XBT_DEBUG("Sleep for: %lf secs", args.time);
501   int rank = simgrid::s4u::this_actor::get_pid();
502   TRACE_smpi_sleeping_in(rank, args.time);
503   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
504   TRACE_smpi_sleeping_out(rank);
505 }
506
507 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
508 {
509   const LocationParser& args = get_args();
510   smpi_trace_set_call_location(args.filename.c_str(), args.line);
511 }
512
513 void TestAction::kernel(simgrid::xbt::ReplayAction&)
514 {
515   const WaitTestParser& args = get_args();
516   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
517   req_storage.remove(request);
518   // if request is null here, this may mean that a previous test has succeeded
519   // Different times in traced application and replayed version may lead to this
520   // In this case, ignore the extra calls.
521   if (request != MPI_REQUEST_NULL) {
522     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
523
524     MPI_Status status;
525     int flag = 0;
526     Request::test(&request, &status, &flag);
527
528     XBT_DEBUG("MPI_Test result: %d", flag);
529     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
530      * nullptr.*/
531     if (request == MPI_REQUEST_NULL)
532       req_storage.addNullRequest(args.src, args.dst, args.tag);
533     else
534       req_storage.add(request);
535
536     TRACE_smpi_comm_out(get_pid());
537   }
538 }
539
540 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
541 {
542   CHECK_ACTION_PARAMS(action, 0, 1)
543     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
544     : MPI_BYTE;  // default TAU datatype
545
546   /* start a simulated timer */
547   smpi_process()->simulated_start();
548 }
549
550 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
551 {
552   /* nothing to do */
553 }
554
555 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
556 {
557   const unsigned int count_requests = req_storage.size();
558
559   if (count_requests > 0) {
560     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
561     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
562     std::vector<MPI_Request> reqs;
563     req_storage.get_requests(reqs);
564     for (auto const& req : reqs) {
565       if (req && (req->flags() & MPI_REQ_RECV)) {
566         sender_receiver.emplace_back(req->src(), req->dst());
567       }
568     }
569     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
570     req_storage.get_store().clear();
571
572     for (auto const& pair : sender_receiver) {
573       TRACE_smpi_recv(pair.first, pair.second, 0);
574     }
575     TRACE_smpi_comm_out(get_pid());
576   }
577 }
578
579 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
580 {
581   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
582   colls::barrier(MPI_COMM_WORLD);
583   TRACE_smpi_comm_out(get_pid());
584 }
585
586 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
587 {
588   const BcastArgParser& args = get_args();
589   TRACE_smpi_comm_in(get_pid(), "action_bcast",
590                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor_pid(args.root), -1.0,
591                                                     args.size, -1, Datatype::encode(args.datatype1), ""));
592
593   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
594
595   TRACE_smpi_comm_out(get_pid());
596 }
597
598 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
599 {
600   const ReduceArgParser& args = get_args();
601   TRACE_smpi_comm_in(get_pid(), "action_reduce",
602                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor_pid(args.root),
603                                                     args.comp_size, args.comm_size, -1,
604                                                     Datatype::encode(args.datatype1), ""));
605
606   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
607                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
608                 args.root, MPI_COMM_WORLD);
609   private_execute_flops(args.comp_size);
610
611   TRACE_smpi_comm_out(get_pid());
612 }
613
614 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
615 {
616   const AllReduceArgParser& args = get_args();
617   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
618                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
619                                                     Datatype::encode(args.datatype1), ""));
620
621   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
622                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
623                    MPI_COMM_WORLD);
624   private_execute_flops(args.comp_size);
625
626   TRACE_smpi_comm_out(get_pid());
627 }
628
629 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
630 {
631   const AllToAllArgParser& args = get_args();
632   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
633                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
634                                                     Datatype::encode(args.datatype1),
635                                                     Datatype::encode(args.datatype2)));
636
637   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
638                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
639                   MPI_COMM_WORLD);
640
641   TRACE_smpi_comm_out(get_pid());
642 }
643
644 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
645 {
646   const GatherArgParser& args = get_args();
647   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
648                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
649                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
650                                                     Datatype::encode(args.datatype2)));
651
652   if (get_name() == "gather") {
653     int rank = MPI_COMM_WORLD->rank();
654     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
655                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
656                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
657   } else
658     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
659                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
660                      MPI_COMM_WORLD);
661
662   TRACE_smpi_comm_out(get_pid());
663 }
664
665 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
666 {
667   int rank = MPI_COMM_WORLD->rank();
668   const GatherVArgParser& args = get_args();
669   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
670                      new simgrid::instr::VarCollTIData(
671                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
672                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
673
674   if (get_name() == "gatherv") {
675     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
676                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
677                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
678   } else {
679     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
681                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
682   }
683
684   TRACE_smpi_comm_out(get_pid());
685 }
686
687 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
688 {
689   int rank = MPI_COMM_WORLD->rank();
690   const ScatterArgParser& args = get_args();
691   TRACE_smpi_comm_in(get_pid(), "action_scatter",
692                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
693                                                     Datatype::encode(args.datatype1),
694                                                     Datatype::encode(args.datatype2)));
695
696   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
698                  args.datatype2, args.root, MPI_COMM_WORLD);
699
700   TRACE_smpi_comm_out(get_pid());
701 }
702
703 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
704 {
705   int rank = MPI_COMM_WORLD->rank();
706   const ScatterVArgParser& args = get_args();
707   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
708                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
709                                                        nullptr, Datatype::encode(args.datatype1),
710                                                        Datatype::encode(args.datatype2)));
711
712   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
713                   args.sendcounts->data(), args.disps.data(), args.datatype1,
714                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
715                   MPI_COMM_WORLD);
716
717   TRACE_smpi_comm_out(get_pid());
718 }
719
720 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
721 {
722   const ReduceScatterArgParser& args = get_args();
723   TRACE_smpi_comm_in(
724       get_pid(), "action_reducescatter",
725       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
726                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
727                                         Datatype::encode(args.datatype1)));
728
729   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
730                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
731                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
732
733   private_execute_flops(args.comp_size);
734   TRACE_smpi_comm_out(get_pid());
735 }
736
737 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
738 {
739   const AllToAllVArgParser& args = get_args();
740   TRACE_smpi_comm_in(get_pid(), __func__,
741                      new simgrid::instr::VarCollTIData(
742                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
743                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
744
745   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
746                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
747                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
748
749   TRACE_smpi_comm_out(get_pid());
750 }
751 } // Replay Namespace
752 }} // namespace simgrid::smpi
753
754 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
755 /** @brief Only initialize the replay, don't do it for real */
756 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
757 {
758   xbt_assert(not smpi_process()->initializing());
759
760   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
761   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
762   simgrid::smpi::ActorExt::init();
763
764   smpi_process()->mark_as_initialized();
765   smpi_process()->set_replaying(true);
766
767   int my_proc_id = simgrid::s4u::this_actor::get_pid();
768
769   TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
770   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
771   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
772   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
773   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
783   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
784   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
785   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
786   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
787   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
788   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
789   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
790   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
791   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
792   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
793   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
794   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
795   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
796   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
797   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
798
799   //if we have a delayed start, sleep here.
800   if (start_delay_flops > 0) {
801     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
802     private_execute_flops(start_delay_flops);
803   } else {
804     // Wait for the other actors to initialize also
805     simgrid::s4u::this_actor::yield();
806   }
807 }
808
809 /** @brief actually run the replay after initialization */
810 void smpi_replay_main(int rank, const char* private_trace_filename)
811 {
812   static int active_processes = 0;
813   active_processes++;
814   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
815   std::string rank_string                      = std::to_string(rank);
816   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
817
818   /* and now, finalize everything */
819   /* One active process will stop. Decrease the counter*/
820   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
821   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
822   if (count_requests > 0) {
823     std::vector<MPI_Request> requests(count_requests);
824     unsigned int i=0;
825
826     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
827       requests[i] = pair.second;
828       i++;
829     }
830     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
831   }
832   active_processes--;
833
834   if(active_processes==0){
835     /* Last process alive speaking: end the simulated timer */
836     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
837     smpi_free_replay_tmp_buffers();
838   }
839
840   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
841                      new simgrid::instr::NoOpTIData("finalize"));
842
843   smpi_process()->finalize();
844
845   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
846 }
847
848 /** @brief chain a replay initialization and a replay start */
849 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
850 {
851   smpi_replay_init(instance_id, rank, start_delay_flops);
852   smpi_replay_main(rank, private_trace_filename);
853 }