Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of framagit.org:simgrid/simgrid
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
65 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92   RequestStorage() = default;
93   int size() const { return store.size(); }
94
95   req_storage_t& get_store() { return store; }
96
97   void get_requests(std::vector<MPI_Request>& vec) const
98   {
99     for (auto const& pair : store) {
100       auto& req       = pair.second;
101       auto my_proc_id = simgrid::s4u::this_actor::get_pid();
102       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103         vec.push_back(pair.second);
104         pair.second->print_request("MM");
105       }
106     }
107   }
108
109     MPI_Request find(int src, int dst, int tag)
110     {
111       auto it = store.find(req_key_t(src, dst, tag));
112       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
113     }
114
115     void remove(const Request* req)
116     {
117       if (req == MPI_REQUEST_NULL) return;
118
119       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
120     }
121
122     void add(MPI_Request req)
123     {
124       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
126     }
127
128     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129     void addNullRequest(int src, int dst, int tag)
130     {
131       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
132                     MPI_REQUEST_NULL});
133     }
134 };
135
136 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
137 {
138   CHECK_ACTION_PARAMS(action, 3, 0)
139   src = std::stoi(action[2]);
140   dst = std::stoi(action[3]);
141   tag = std::stoi(action[4]);
142 }
143
144 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 1)
147   partner = std::stoi(action[2]);
148   tag     = std::stoi(action[3]);
149   size    = parse_double(action[4]);
150   if (action.size() > 5)
151     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
152 }
153
154 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
155 {
156   CHECK_ACTION_PARAMS(action, 1, 0)
157   flops = parse_double(action[2]);
158 }
159
160 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
161 {
162   CHECK_ACTION_PARAMS(action, 1, 0)
163   time = parse_double(action[2]);
164 }
165
166 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 {
168   CHECK_ACTION_PARAMS(action, 2, 0)
169   filename = std::string(action[2]);
170   line = std::stoi(action[3]);
171 }
172
173 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
174 {
175   CHECK_ACTION_PARAMS(action, 1, 2)
176   size = parse_double(action[2]);
177   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
178   if (action.size() > 4)
179     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
180 }
181
182 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
183 {
184   CHECK_ACTION_PARAMS(action, 2, 2)
185   comm_size = parse_double(action[2]);
186   comp_size = parse_double(action[3]);
187   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
188   if (action.size() > 5)
189     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
190 }
191
192 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 {
194   CHECK_ACTION_PARAMS(action, 2, 1)
195   comm_size = parse_double(action[2]);
196   comp_size = parse_double(action[3]);
197   if (action.size() > 4)
198     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
199 }
200
201 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 {
203   CHECK_ACTION_PARAMS(action, 2, 1)
204   comm_size = MPI_COMM_WORLD->size();
205   send_size = parse_double(action[2]);
206   recv_size = parse_double(action[3]);
207
208   if (action.size() > 4)
209     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
210   if (action.size() > 5)
211     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
212 }
213
214 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
215 {
216   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
217         0 gather 68 68 0 0 0
218       where:
219         1) 68 is the sendcounts
220         2) 68 is the recvcounts
221         3) 0 is the root node
222         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
223         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
224   */
225   CHECK_ACTION_PARAMS(action, 2, 3)
226   comm_size = MPI_COMM_WORLD->size();
227   send_size = parse_double(action[2]);
228   recv_size = parse_double(action[3]);
229
230   if (name == "gather") {
231     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
232     if (action.size() > 5)
233       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
234     if (action.size() > 6)
235       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
236   } else {
237     if (action.size() > 4)
238       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
239     if (action.size() > 5)
240       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
241   }
242 }
243
244 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
245 {
246   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
247        0 gather 68 68 10 10 10 0 0 0
248      where:
249        1) 68 is the sendcount
250        2) 68 10 10 10 is the recvcounts
251        3) 0 is the root node
252        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
253        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
254   */
255   comm_size = MPI_COMM_WORLD->size();
256   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
257   send_size  = parse_double(action[2]);
258   disps      = std::vector<int>(comm_size, 0);
259   recvcounts = std::make_shared<std::vector<int>>(comm_size);
260
261   if (name == "gatherv") {
262     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
263     if (action.size() > 4 + comm_size)
264       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
265     if (action.size() > 5 + comm_size)
266       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
267   } else {
268     int disp_index     = 0;
269     /* The 3 comes from "0 gather <sendcount>", which must always be present.
270      * The + comm_size is the recvcounts array, which must also be present
271      */
272     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
273       int datatype_index = 3 + comm_size;
274       disp_index     = datatype_index + 1;
275       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
276       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
277     } else if (action.size() >
278                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
279       disp_index = 3 + comm_size;
280     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
281       int datatype_index = 3 + comm_size;
282       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
283       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
284     }
285
286     if (disp_index != 0) {
287       for (unsigned int i = 0; i < comm_size; i++)
288         disps[i]          = std::stoi(action[disp_index + i]);
289     }
290   }
291
292   for (unsigned int i = 0; i < comm_size; i++) {
293     (*recvcounts)[i] = std::stoi(action[i + 3]);
294   }
295   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
296 }
297
298 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
299 {
300   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
301         0 gather 68 68 0 0 0
302       where:
303         1) 68 is the sendcounts
304         2) 68 is the recvcounts
305         3) 0 is the root node
306         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
307         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
308   */
309   CHECK_ACTION_PARAMS(action, 2, 3)
310   comm_size = MPI_COMM_WORLD->size();
311   send_size = parse_double(action[2]);
312   recv_size = parse_double(action[3]);
313   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
314   if (action.size() > 5)
315     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
316   if (action.size() > 6)
317     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
318 }
319
320 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
321 {
322   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
323      0 gather 68 10 10 10 68 0 0 0
324       where:
325       1) 68 10 10 10 is the sendcounts
326       2) 68 is the recvcount
327       3) 0 is the root node
328       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
329       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
330   */
331   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
332   recv_size  = parse_double(action[2 + comm_size]);
333   disps      = std::vector<int>(comm_size, 0);
334   sendcounts = std::make_shared<std::vector<int>>(comm_size);
335
336   if (action.size() > 5 + comm_size)
337     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
338   if (action.size() > 5 + comm_size)
339     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
340
341   for (unsigned int i = 0; i < comm_size; i++) {
342     (*sendcounts)[i] = std::stoi(action[i + 2]);
343   }
344   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
345   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
346 }
347
348 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
349 {
350   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
351        0 reducescatter 275427 275427 275427 204020 11346849 0
352      where:
353        1) The first four values after the name of the action declare the recvcounts array
354        2) The value 11346849 is the amount of instructions
355        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
356   */
357   comm_size = MPI_COMM_WORLD->size();
358   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
359   comp_size  = parse_double(action[2 + comm_size]);
360   recvcounts = std::make_shared<std::vector<int>>(comm_size);
361   if (action.size() > 3 + comm_size)
362     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
363
364   for (unsigned int i = 0; i < comm_size; i++) {
365     recvcounts->push_back(std::stoi(action[i + 2]));
366   }
367   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
368 }
369
370 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
371 {
372   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
373         0 alltoallv 100 1 7 10 12 100 1 70 10 5
374      where:
375       1) 100 is the size of the send buffer *sizeof(int),
376       2) 1 7 10 12 is the sendcounts array
377       3) 100*sizeof(int) is the size of the receiver buffer
378       4)  1 70 10 5 is the recvcounts array
379   */
380   comm_size = MPI_COMM_WORLD->size();
381   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
382   sendcounts = std::make_shared<std::vector<int>>(comm_size);
383   recvcounts = std::make_shared<std::vector<int>>(comm_size);
384   senddisps  = std::vector<int>(comm_size, 0);
385   recvdisps  = std::vector<int>(comm_size, 0);
386
387   if (action.size() > 5 + 2 * comm_size)
388     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
389   if (action.size() > 5 + 2 * comm_size)
390     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
391
392   send_buf_size = parse_double(action[2]);
393   recv_buf_size = parse_double(action[3 + comm_size]);
394   for (unsigned int i = 0; i < comm_size; i++) {
395     (*sendcounts)[i] = std::stoi(action[3 + i]);
396     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
397   }
398   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
399   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
400 }
401
402 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
403 {
404   std::string s = boost::algorithm::join(action, " ");
405   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
406   const WaitTestParser& args = get_args();
407   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
408   req_storage.remove(request);
409
410   if (request == MPI_REQUEST_NULL) {
411     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
412      * return.*/
413     return;
414   }
415
416   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
417
418   // Must be taken before Request::wait() since the request may be set to
419   // MPI_REQUEST_NULL by Request::wait!
420   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
421   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
422   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
423
424   MPI_Status status;
425   Request::wait(&request, &status);
426
427   TRACE_smpi_comm_out(rank);
428   if (is_wait_for_receive)
429     TRACE_smpi_recv(args.src, args.dst, args.tag);
430 }
431
432 void SendAction::kernel(simgrid::xbt::ReplayAction&)
433 {
434   const SendRecvParser& args = get_args();
435   int dst_traced             = MPI_COMM_WORLD->group()->actor(args.partner);
436
437   TRACE_smpi_comm_in(
438       get_pid(), __func__,
439       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
440   if (not TRACE_smpi_view_internals())
441     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
442
443   if (get_name() == "send") {
444     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445   } else if (get_name() == "isend") {
446     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447     req_storage.add(request);
448   } else {
449     xbt_die("Don't know this action, %s", get_name().c_str());
450   }
451
452   TRACE_smpi_comm_out(get_pid());
453 }
454
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
456 {
457   const SendRecvParser& args = get_args();
458   TRACE_smpi_comm_in(
459       get_pid(), __func__,
460       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
461
462   MPI_Status status;
463   // unknown size from the receiver point of view
464   double arg_size = args.size;
465   if (arg_size <= 0.0) {
466     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467     arg_size = status.count;
468   }
469
470   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
471   if (get_name() == "recv") {
472     is_recv = true;
473     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474   } else if (get_name() == "irecv") {
475     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476     req_storage.add(request);
477   } else {
478     THROW_IMPOSSIBLE;
479   }
480
481   TRACE_smpi_comm_out(get_pid());
482   if (is_recv && not TRACE_smpi_view_internals()) {
483     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
484     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
485   }
486 }
487
488 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
489 {
490   const ComputeParser& args = get_args();
491   if (smpi_cfg_simulate_computation()) {
492     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
493   }
494 }
495
496 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
497 {
498   const SleepParser& args = get_args();
499   XBT_DEBUG("Sleep for: %lf secs", args.time);
500   int rank = simgrid::s4u::this_actor::get_pid();
501   TRACE_smpi_sleeping_in(rank, args.time);
502   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503   TRACE_smpi_sleeping_out(rank);
504 }
505
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   const LocationParser& args = get_args();
509   smpi_trace_set_call_location(args.filename.c_str(), args.line);
510 }
511
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
513 {
514   const WaitTestParser& args = get_args();
515   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
516   req_storage.remove(request);
517   // if request is null here, this may mean that a previous test has succeeded
518   // Different times in traced application and replayed version may lead to this
519   // In this case, ignore the extra calls.
520   if (request != MPI_REQUEST_NULL) {
521     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
522
523     MPI_Status status;
524     int flag = 0;
525     Request::test(&request, &status, &flag);
526
527     XBT_DEBUG("MPI_Test result: %d", flag);
528     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
529      * nullptr.*/
530     if (request == MPI_REQUEST_NULL)
531       req_storage.addNullRequest(args.src, args.dst, args.tag);
532     else
533       req_storage.add(request);
534
535     TRACE_smpi_comm_out(get_pid());
536   }
537 }
538
539 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
540 {
541   CHECK_ACTION_PARAMS(action, 0, 1)
542     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
543     : MPI_BYTE;  // default TAU datatype
544
545   /* start a simulated timer */
546   smpi_process()->simulated_start();
547 }
548
549 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
550 {
551   /* nothing to do */
552 }
553
554 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   const unsigned int count_requests = req_storage.size();
557
558   if (count_requests > 0) {
559     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
560     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
561     std::vector<MPI_Request> reqs;
562     req_storage.get_requests(reqs);
563     for (auto const& req : reqs) {
564       if (req && (req->flags() & MPI_REQ_RECV)) {
565         sender_receiver.emplace_back(req->src(), req->dst());
566       }
567     }
568     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
569     req_storage.get_store().clear();
570
571     for (auto const& pair : sender_receiver) {
572       TRACE_smpi_recv(pair.first, pair.second, 0);
573     }
574     TRACE_smpi_comm_out(get_pid());
575   }
576 }
577
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
579 {
580   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
581   colls::barrier(MPI_COMM_WORLD);
582   TRACE_smpi_comm_out(get_pid());
583 }
584
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
586 {
587   const BcastArgParser& args = get_args();
588   TRACE_smpi_comm_in(get_pid(), "action_bcast",
589                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
590                                                     -1, Datatype::encode(args.datatype1), ""));
591
592   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
593
594   TRACE_smpi_comm_out(get_pid());
595 }
596
597 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
598 {
599   const ReduceArgParser& args = get_args();
600   TRACE_smpi_comm_in(get_pid(), "action_reduce",
601                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
602                                                     args.comm_size, -1, Datatype::encode(args.datatype1), ""));
603
604   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
605                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
606                 args.root, MPI_COMM_WORLD);
607   private_execute_flops(args.comp_size);
608
609   TRACE_smpi_comm_out(get_pid());
610 }
611
612 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
613 {
614   const AllReduceArgParser& args = get_args();
615   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
616                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
617                                                     Datatype::encode(args.datatype1), ""));
618
619   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
620                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
621                    MPI_COMM_WORLD);
622   private_execute_flops(args.comp_size);
623
624   TRACE_smpi_comm_out(get_pid());
625 }
626
627 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
628 {
629   const AllToAllArgParser& args = get_args();
630   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
631                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
632                                                     Datatype::encode(args.datatype1),
633                                                     Datatype::encode(args.datatype2)));
634
635   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
636                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
637                   MPI_COMM_WORLD);
638
639   TRACE_smpi_comm_out(get_pid());
640 }
641
642 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
643 {
644   const GatherArgParser& args = get_args();
645   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
646                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
647                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
648                                                     Datatype::encode(args.datatype2)));
649
650   if (get_name() == "gather") {
651     int rank = MPI_COMM_WORLD->rank();
652     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
654                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
655   } else
656     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
657                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
658                      MPI_COMM_WORLD);
659
660   TRACE_smpi_comm_out(get_pid());
661 }
662
663 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
664 {
665   int rank = MPI_COMM_WORLD->rank();
666   const GatherVArgParser& args = get_args();
667   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668                      new simgrid::instr::VarCollTIData(
669                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
670                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
671
672   if (get_name() == "gatherv") {
673     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
674                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
675                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
676   } else {
677     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
678                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
679                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
680   }
681
682   TRACE_smpi_comm_out(get_pid());
683 }
684
685 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
686 {
687   int rank = MPI_COMM_WORLD->rank();
688   const ScatterArgParser& args = get_args();
689   TRACE_smpi_comm_in(get_pid(), "action_scatter",
690                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
691                                                     Datatype::encode(args.datatype1),
692                                                     Datatype::encode(args.datatype2)));
693
694   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
696                  args.datatype2, args.root, MPI_COMM_WORLD);
697
698   TRACE_smpi_comm_out(get_pid());
699 }
700
701 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
702 {
703   int rank = MPI_COMM_WORLD->rank();
704   const ScatterVArgParser& args = get_args();
705   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
706                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
707                                                        nullptr, Datatype::encode(args.datatype1),
708                                                        Datatype::encode(args.datatype2)));
709
710   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
711                   args.sendcounts->data(), args.disps.data(), args.datatype1,
712                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
713                   MPI_COMM_WORLD);
714
715   TRACE_smpi_comm_out(get_pid());
716 }
717
718 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
719 {
720   const ReduceScatterArgParser& args = get_args();
721   TRACE_smpi_comm_in(
722       get_pid(), "action_reducescatter",
723       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
724                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
725                                         Datatype::encode(args.datatype1)));
726
727   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
728                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
729                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
730
731   private_execute_flops(args.comp_size);
732   TRACE_smpi_comm_out(get_pid());
733 }
734
735 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
736 {
737   const AllToAllVArgParser& args = get_args();
738   TRACE_smpi_comm_in(get_pid(), __func__,
739                      new simgrid::instr::VarCollTIData(
740                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
741                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
742
743   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
744                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
745                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
746
747   TRACE_smpi_comm_out(get_pid());
748 }
749 } // Replay Namespace
750 }} // namespace simgrid::smpi
751
752 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
753 /** @brief Only initialize the replay, don't do it for real */
754 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
755 {
756   xbt_assert(not smpi_process()->initializing());
757
758   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
759   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
760   simgrid::smpi::ActorExt::init();
761
762   smpi_process()->mark_as_initialized();
763   smpi_process()->set_replaying(true);
764
765   int my_proc_id = simgrid::s4u::this_actor::get_pid();
766
767   TRACE_smpi_init(my_proc_id, "smpi_replay_run_init");
768   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
769   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
770   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
771   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
772   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
773   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
774   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
775   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
781   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
782   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
783   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
784   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
785   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
786   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
787   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
788   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
789   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
790   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
791   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
792   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
793   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
794   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
795   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
796
797   //if we have a delayed start, sleep here.
798   if (start_delay_flops > 0) {
799     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
800     private_execute_flops(start_delay_flops);
801   } else {
802     // Wait for the other actors to initialize also
803     simgrid::s4u::this_actor::yield();
804   }
805 }
806
807 /** @brief actually run the replay after initialization */
808 void smpi_replay_main(int rank, const char* private_trace_filename)
809 {
810   static int active_processes = 0;
811   active_processes++;
812   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
813   std::string rank_string                      = std::to_string(rank);
814   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
815
816   /* and now, finalize everything */
817   /* One active process will stop. Decrease the counter*/
818   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
819   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
820   if (count_requests > 0) {
821     std::vector<MPI_Request> requests(count_requests);
822     unsigned int i=0;
823
824     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
825       requests[i] = pair.second;
826       i++;
827     }
828     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
829   }
830   active_processes--;
831
832   if(active_processes==0){
833     /* Last process alive speaking: end the simulated timer */
834     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
835     smpi_free_replay_tmp_buffers();
836   }
837
838   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
839                      new simgrid::instr::NoOpTIData("finalize"));
840
841   smpi_process()->finalize();
842
843   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
844 }
845
846 /** @brief chain a replay initialization and a replay start */
847 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
848 {
849   smpi_replay_init(instance_id, rank, start_delay_flops);
850   smpi_replay_main(rank, private_trace_filename);
851 }