Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[sonar] Replace redundant type with "auto" (src/smpi/).
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2020. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size() const { return store.size(); }
94
95     req_storage_t& get_store()
96     {
97       return store;
98     }
99
100     void get_requests(std::vector<MPI_Request>& vec) const
101     {
102       for (auto const& pair : store) {
103         auto& req = pair.second;
104         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
105         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
106           vec.push_back(pair.second);
107           pair.second->print_request("MM");
108         }
109       }
110     }
111
112     MPI_Request find(int src, int dst, int tag)
113     {
114       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
115       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
116     }
117
118     void remove(const Request* req)
119     {
120       if (req == MPI_REQUEST_NULL) return;
121
122       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
123     }
124
125     void add(MPI_Request req)
126     {
127       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
128         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
129     }
130
131     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
132     void addNullRequest(int src, int dst, int tag)
133     {
134       store.insert({req_key_t(
135             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
136             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
137             tag), MPI_REQUEST_NULL});
138     }
139 };
140
141 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
142 {
143   CHECK_ACTION_PARAMS(action, 3, 0)
144   src = std::stoi(action[2]);
145   dst = std::stoi(action[3]);
146   tag = std::stoi(action[4]);
147 }
148
149 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
150 {
151   CHECK_ACTION_PARAMS(action, 3, 1)
152   partner = std::stoi(action[2]);
153   tag     = std::stoi(action[3]);
154   size    = parse_double(action[4]);
155   if (action.size() > 5)
156     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
157 }
158
159 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
160 {
161   CHECK_ACTION_PARAMS(action, 1, 0)
162   flops = parse_double(action[2]);
163 }
164
165 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 {
167   CHECK_ACTION_PARAMS(action, 1, 0)
168   time = parse_double(action[2]);
169 }
170
171 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
172 {
173   CHECK_ACTION_PARAMS(action, 2, 0)
174   filename = std::string(action[2]);
175   line = std::stoi(action[3]);
176 }
177
178 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 {
180   CHECK_ACTION_PARAMS(action, 1, 2)
181   size = parse_double(action[2]);
182   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
183   if (action.size() > 4)
184     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
185 }
186
187 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 2)
190   comm_size = parse_double(action[2]);
191   comp_size = parse_double(action[3]);
192   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
193   if (action.size() > 5)
194     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
195 }
196
197 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 {
199   CHECK_ACTION_PARAMS(action, 2, 1)
200   comm_size = parse_double(action[2]);
201   comp_size = parse_double(action[3]);
202   if (action.size() > 4)
203     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204 }
205
206 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
207 {
208   CHECK_ACTION_PARAMS(action, 2, 1)
209   comm_size = MPI_COMM_WORLD->size();
210   send_size = parse_double(action[2]);
211   recv_size = parse_double(action[3]);
212
213   if (action.size() > 4)
214     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
215   if (action.size() > 5)
216     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
217 }
218
219 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
220 {
221   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
222         0 gather 68 68 0 0 0
223       where:
224         1) 68 is the sendcounts
225         2) 68 is the recvcounts
226         3) 0 is the root node
227         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
228         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
229   */
230   CHECK_ACTION_PARAMS(action, 2, 3)
231   comm_size = MPI_COMM_WORLD->size();
232   send_size = parse_double(action[2]);
233   recv_size = parse_double(action[3]);
234
235   if (name == "gather") {
236     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
237     if (action.size() > 5)
238       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
239     if (action.size() > 6)
240       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
241   } else {
242     if (action.size() > 4)
243       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
244     if (action.size() > 5)
245       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
246   }
247 }
248
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
250 {
251   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252        0 gather 68 68 10 10 10 0 0 0
253      where:
254        1) 68 is the sendcount
255        2) 68 10 10 10 is the recvcounts
256        3) 0 is the root node
257        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
259   */
260   comm_size = MPI_COMM_WORLD->size();
261   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262   send_size  = parse_double(action[2]);
263   disps      = std::vector<int>(comm_size, 0);
264   recvcounts = std::make_shared<std::vector<int>>(comm_size);
265
266   if (name == "gatherv") {
267     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
268     if (action.size() > 4 + comm_size)
269       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
270     if (action.size() > 5 + comm_size)
271       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
272   } else {
273     int disp_index     = 0;
274     /* The 3 comes from "0 gather <sendcount>", which must always be present.
275      * The + comm_size is the recvcounts array, which must also be present
276      */
277     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
278       int datatype_index = 3 + comm_size;
279       disp_index     = datatype_index + 1;
280       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
281       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
282     } else if (action.size() >
283                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
284       disp_index = 3 + comm_size;
285     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
286       int datatype_index = 3 + comm_size;
287       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
288       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
289     }
290
291     if (disp_index != 0) {
292       for (unsigned int i = 0; i < comm_size; i++)
293         disps[i]          = std::stoi(action[disp_index + i]);
294     }
295   }
296
297   for (unsigned int i = 0; i < comm_size; i++) {
298     (*recvcounts)[i] = std::stoi(action[i + 3]);
299   }
300   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
301 }
302
303 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
304 {
305   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
306         0 gather 68 68 0 0 0
307       where:
308         1) 68 is the sendcounts
309         2) 68 is the recvcounts
310         3) 0 is the root node
311         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
312         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
313   */
314   CHECK_ACTION_PARAMS(action, 2, 3)
315   comm_size = MPI_COMM_WORLD->size();
316   send_size = parse_double(action[2]);
317   recv_size = parse_double(action[3]);
318   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
319   if (action.size() > 5)
320     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
321   if (action.size() > 6)
322     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
323 }
324
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
326 {
327   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328      0 gather 68 10 10 10 68 0 0 0
329       where:
330       1) 68 10 10 10 is the sendcounts
331       2) 68 is the recvcount
332       3) 0 is the root node
333       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
335   */
336   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337   recv_size  = parse_double(action[2 + comm_size]);
338   disps      = std::vector<int>(comm_size, 0);
339   sendcounts = std::make_shared<std::vector<int>>(comm_size);
340
341   if (action.size() > 5 + comm_size)
342     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
343   if (action.size() > 5 + comm_size)
344     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
345
346   for (unsigned int i = 0; i < comm_size; i++) {
347     (*sendcounts)[i] = std::stoi(action[i + 2]);
348   }
349   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
350   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
351 }
352
353 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
354 {
355   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
356        0 reducescatter 275427 275427 275427 204020 11346849 0
357      where:
358        1) The first four values after the name of the action declare the recvcounts array
359        2) The value 11346849 is the amount of instructions
360        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
361   */
362   comm_size = MPI_COMM_WORLD->size();
363   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
364   comp_size  = parse_double(action[2 + comm_size]);
365   recvcounts = std::make_shared<std::vector<int>>(comm_size);
366   if (action.size() > 3 + comm_size)
367     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
368
369   for (unsigned int i = 0; i < comm_size; i++) {
370     recvcounts->push_back(std::stoi(action[i + 2]));
371   }
372   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
373 }
374
375 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
376 {
377   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
378         0 alltoallv 100 1 7 10 12 100 1 70 10 5
379      where:
380       1) 100 is the size of the send buffer *sizeof(int),
381       2) 1 7 10 12 is the sendcounts array
382       3) 100*sizeof(int) is the size of the receiver buffer
383       4)  1 70 10 5 is the recvcounts array
384   */
385   comm_size = MPI_COMM_WORLD->size();
386   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
387   sendcounts = std::make_shared<std::vector<int>>(comm_size);
388   recvcounts = std::make_shared<std::vector<int>>(comm_size);
389   senddisps  = std::vector<int>(comm_size, 0);
390   recvdisps  = std::vector<int>(comm_size, 0);
391
392   if (action.size() > 5 + 2 * comm_size)
393     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
394   if (action.size() > 5 + 2 * comm_size)
395     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
396
397   send_buf_size = parse_double(action[2]);
398   recv_buf_size = parse_double(action[3 + comm_size]);
399   for (unsigned int i = 0; i < comm_size; i++) {
400     (*sendcounts)[i] = std::stoi(action[3 + i]);
401     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
402   }
403   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
405 }
406
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
408 {
409   std::string s = boost::algorithm::join(action, " ");
410   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411   const WaitTestParser& args = get_args();
412   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413   req_storage.remove(request);
414
415   if (request == MPI_REQUEST_NULL) {
416     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
417      * return.*/
418     return;
419   }
420
421   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
422
423   // Must be taken before Request::wait() since the request may be set to
424   // MPI_REQUEST_NULL by Request::wait!
425   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
428
429   MPI_Status status;
430   Request::wait(&request, &status);
431
432   TRACE_smpi_comm_out(rank);
433   if (is_wait_for_receive)
434     TRACE_smpi_recv(args.src, args.dst, args.tag);
435 }
436
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
438 {
439   const SendRecvParser& args = get_args();
440   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
441
442   TRACE_smpi_comm_in(
443       get_pid(), __func__,
444       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (get_name() == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (get_name() == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", get_name().c_str());
455   }
456
457   TRACE_smpi_comm_out(get_pid());
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   const SendRecvParser& args = get_args();
463   TRACE_smpi_comm_in(
464       get_pid(), __func__,
465       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
466
467   MPI_Status status;
468   // unknown size from the receiver point of view
469   double arg_size = args.size;
470   if (arg_size <= 0.0) {
471     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472     arg_size = status.count;
473   }
474
475   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476   if (get_name() == "recv") {
477     is_recv = true;
478     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479   } else if (get_name() == "irecv") {
480     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481     req_storage.add(request);
482   } else {
483     THROW_IMPOSSIBLE;
484   }
485
486   TRACE_smpi_comm_out(get_pid());
487   if (is_recv && not TRACE_smpi_view_internals()) {
488     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
489     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
490   }
491 }
492
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
494 {
495   const ComputeParser& args = get_args();
496   if (smpi_cfg_simulate_computation()) {
497     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
498   }
499 }
500
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
502 {
503   const SleepParser& args = get_args();
504   XBT_DEBUG("Sleep for: %lf secs", args.time);
505   int rank = simgrid::s4u::this_actor::get_pid();
506   TRACE_smpi_sleeping_in(rank, args.time);
507   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508   TRACE_smpi_sleeping_out(rank);
509 }
510
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   const LocationParser& args = get_args();
514   smpi_trace_set_call_location(args.filename.c_str(), args.line);
515 }
516
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
518 {
519   const WaitTestParser& args = get_args();
520   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521   req_storage.remove(request);
522   // if request is null here, this may mean that a previous test has succeeded
523   // Different times in traced application and replayed version may lead to this
524   // In this case, ignore the extra calls.
525   if (request != MPI_REQUEST_NULL) {
526     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
527
528     MPI_Status status;
529     int flag = 0;
530     Request::test(&request, &status, &flag);
531
532     XBT_DEBUG("MPI_Test result: %d", flag);
533     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
534      * nullptr.*/
535     if (request == MPI_REQUEST_NULL)
536       req_storage.addNullRequest(args.src, args.dst, args.tag);
537     else
538       req_storage.add(request);
539
540     TRACE_smpi_comm_out(get_pid());
541   }
542 }
543
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
545 {
546   CHECK_ACTION_PARAMS(action, 0, 1)
547     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548     : MPI_BYTE;  // default TAU datatype
549
550   /* start a simulated timer */
551   smpi_process()->simulated_start();
552 }
553
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   /* nothing to do */
557 }
558
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
560 {
561   const unsigned int count_requests = req_storage.size();
562
563   if (count_requests > 0) {
564     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
565     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
566     std::vector<MPI_Request> reqs;
567     req_storage.get_requests(reqs);
568     for (auto const& req : reqs) {
569       if (req && (req->flags() & MPI_REQ_RECV)) {
570         sender_receiver.push_back({req->src(), req->dst()});
571       }
572     }
573     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574     req_storage.get_store().clear();
575
576     for (auto const& pair : sender_receiver) {
577       TRACE_smpi_recv(pair.first, pair.second, 0);
578     }
579     TRACE_smpi_comm_out(get_pid());
580   }
581 }
582
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
584 {
585   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586   colls::barrier(MPI_COMM_WORLD);
587   TRACE_smpi_comm_out(get_pid());
588 }
589
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
591 {
592   const BcastArgParser& args = get_args();
593   TRACE_smpi_comm_in(get_pid(), "action_bcast",
594                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(), -1.0,
595                                                     args.size, -1, Datatype::encode(args.datatype1), ""));
596
597   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
598
599   TRACE_smpi_comm_out(get_pid());
600 }
601
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
603 {
604   const ReduceArgParser& args = get_args();
605   TRACE_smpi_comm_in(get_pid(), "action_reduce",
606                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
607                                                     args.comp_size, args.comm_size, -1,
608                                                     Datatype::encode(args.datatype1), ""));
609
610   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
611                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
612                 args.root, MPI_COMM_WORLD);
613   private_execute_flops(args.comp_size);
614
615   TRACE_smpi_comm_out(get_pid());
616 }
617
618 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
619 {
620   const AllReduceArgParser& args = get_args();
621   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
622                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
623                                                     Datatype::encode(args.datatype1), ""));
624
625   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
626                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
627                    MPI_COMM_WORLD);
628   private_execute_flops(args.comp_size);
629
630   TRACE_smpi_comm_out(get_pid());
631 }
632
633 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
634 {
635   const AllToAllArgParser& args = get_args();
636   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
637                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
638                                                     Datatype::encode(args.datatype1),
639                                                     Datatype::encode(args.datatype2)));
640
641   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
642                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
643                   MPI_COMM_WORLD);
644
645   TRACE_smpi_comm_out(get_pid());
646 }
647
648 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
649 {
650   const GatherArgParser& args = get_args();
651   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
652                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
653                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
654                                                     Datatype::encode(args.datatype2)));
655
656   if (get_name() == "gather") {
657     int rank = MPI_COMM_WORLD->rank();
658     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
659                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
660                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
661   } else
662     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
664                      MPI_COMM_WORLD);
665
666   TRACE_smpi_comm_out(get_pid());
667 }
668
669 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
670 {
671   int rank = MPI_COMM_WORLD->rank();
672   const GatherVArgParser& args = get_args();
673   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
674                      new simgrid::instr::VarCollTIData(
675                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
676                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
677
678   if (get_name() == "gatherv") {
679     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
681                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
682   } else {
683     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
684                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
685                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
686   }
687
688   TRACE_smpi_comm_out(get_pid());
689 }
690
691 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
692 {
693   int rank = MPI_COMM_WORLD->rank();
694   const ScatterArgParser& args = get_args();
695   TRACE_smpi_comm_in(get_pid(), "action_scatter",
696                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
697                                                     Datatype::encode(args.datatype1),
698                                                     Datatype::encode(args.datatype2)));
699
700   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
701                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
702                  args.datatype2, args.root, MPI_COMM_WORLD);
703
704   TRACE_smpi_comm_out(get_pid());
705 }
706
707 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
708 {
709   int rank = MPI_COMM_WORLD->rank();
710   const ScatterVArgParser& args = get_args();
711   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
712                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
713                                                        nullptr, Datatype::encode(args.datatype1),
714                                                        Datatype::encode(args.datatype2)));
715
716   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
717                   args.sendcounts->data(), args.disps.data(), args.datatype1,
718                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
719                   MPI_COMM_WORLD);
720
721   TRACE_smpi_comm_out(get_pid());
722 }
723
724 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
725 {
726   const ReduceScatterArgParser& args = get_args();
727   TRACE_smpi_comm_in(
728       get_pid(), "action_reducescatter",
729       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
730                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
731                                         Datatype::encode(args.datatype1)));
732
733   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
734                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
735                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
736
737   private_execute_flops(args.comp_size);
738   TRACE_smpi_comm_out(get_pid());
739 }
740
741 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
742 {
743   const AllToAllVArgParser& args = get_args();
744   TRACE_smpi_comm_in(get_pid(), __func__,
745                      new simgrid::instr::VarCollTIData(
746                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
747                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
748
749   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
750                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
751                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
752
753   TRACE_smpi_comm_out(get_pid());
754 }
755 } // Replay Namespace
756 }} // namespace simgrid::smpi
757
758 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
759 /** @brief Only initialize the replay, don't do it for real */
760 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
761 {
762   xbt_assert(not smpi_process()->initializing());
763
764   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
765   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
766   simgrid::smpi::ActorExt::init();
767
768   smpi_process()->mark_as_initialized();
769   smpi_process()->set_replaying(true);
770
771   int my_proc_id = simgrid::s4u::this_actor::get_pid();
772
773   TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
774   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
775   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
776   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
777   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
778   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
779   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
784   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
785   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
786   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
787   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
788   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
789   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
790   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
791   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
792   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
793   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
794   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
795   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
796   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
797   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
798   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
799   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
800   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
801   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
802
803   //if we have a delayed start, sleep here.
804   if (start_delay_flops > 0) {
805     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
806     private_execute_flops(start_delay_flops);
807   } else {
808     // Wait for the other actors to initialize also
809     simgrid::s4u::this_actor::yield();
810   }
811 }
812
813 /** @brief actually run the replay after initialization */
814 void smpi_replay_main(int rank, const char* trace_filename)
815 {
816   static int active_processes = 0;
817   active_processes++;
818   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
819   std::string rank_string                      = std::to_string(rank);
820   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
821
822   /* and now, finalize everything */
823   /* One active process will stop. Decrease the counter*/
824   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
825   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
826   if (count_requests > 0) {
827     auto* requests = new MPI_Request[count_requests];
828     unsigned int i=0;
829
830     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
831       requests[i] = pair.second;
832       i++;
833     }
834     simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
835     delete[] requests;
836   }
837   active_processes--;
838
839   if(active_processes==0){
840     /* Last process alive speaking: end the simulated timer */
841     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
842     smpi_free_replay_tmp_buffers();
843   }
844
845   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
846                      new simgrid::instr::NoOpTIData("finalize"));
847
848   smpi_process()->finalize();
849
850   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
851 }
852
853 /** @brief chain a replay initialization and a replay start */
854 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
855 {
856   smpi_replay_init(instance_id, rank, start_delay_flops);
857   smpi_replay_main(rank, trace_filename);
858 }