Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Apply clang-format to ArgParsers
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
15
16 #include <boost/algorithm/string/join.hpp>
17 #include <memory>
18 #include <numeric>
19 #include <unordered_map>
20 #include <vector>
21
22 #include <tuple>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
28 public:
29   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
30 };
31
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
33 {
34   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 }
36
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
39 public:
40   static void apply(size_t& seed, Tuple const& tuple)
41   {
42     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43     hash_combine(seed, std::get<Index>(tuple));
44   }
45 };
46
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
48 public:
49   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
50 };
51
52 template <typename... TT> class hash<std::tuple<TT...>> {
53 public:
54   size_t operator()(std::tuple<TT...> const& tt) const
55   {
56     size_t seed = 0;
57     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
58     return seed;
59   }
60 };
61 }
62
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
64
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67
68
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
70 {
71   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72     std::string s = boost::algorithm::join(action, " ");
73     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74   }
75 }
76
77 /* Helper function */
78 static double parse_double(std::string string)
79 {
80   return xbt_str_parse_double(string.c_str(), "%s is not a double");
81 }
82
83 namespace simgrid {
84 namespace smpi {
85
86 namespace replay {
87 MPI_Datatype MPI_DEFAULT_TYPE;
88
89 class RequestStorage {
90 private:
91     req_storage_t store;
92
93 public:
94     RequestStorage() {}
95     int size()
96     {
97       return store.size();
98     }
99
100     req_storage_t& get_store()
101     {
102       return store;
103     }
104
105     void get_requests(std::vector<MPI_Request>& vec)
106     {
107       for (auto& pair : store) {
108         auto& req = pair.second;
109         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111           vec.push_back(pair.second);
112           pair.second->print_request("MM");
113         }
114       }
115     }
116
117     MPI_Request find(int src, int dst, int tag)
118     {
119       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121     }
122
123     void remove(MPI_Request req)
124     {
125       if (req == MPI_REQUEST_NULL) return;
126
127       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128     }
129
130     void add(MPI_Request req)
131     {
132       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134     }
135
136     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137     void addNullRequest(int src, int dst, int tag)
138     {
139       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
140     }
141 };
142
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
144 {
145   CHECK_ACTION_PARAMS(action, 3, 0)
146   src = std::stoi(action[2]);
147   dst = std::stoi(action[3]);
148   tag = std::stoi(action[4]);
149 }
150
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
152 {
153   CHECK_ACTION_PARAMS(action, 3, 1)
154   partner = std::stoi(action[2]);
155   tag     = std::stoi(action[3]);
156   size    = parse_double(action[4]);
157   if (action.size() > 5)
158     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
159 }
160
161 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
162 {
163   CHECK_ACTION_PARAMS(action, 1, 0)
164   flops = parse_double(action[2]);
165 }
166
167 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
168 {
169   CHECK_ACTION_PARAMS(action, 1, 2)
170   size = parse_double(action[2]);
171   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
172   if (action.size() > 4)
173     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
174 }
175
176 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
177 {
178   CHECK_ACTION_PARAMS(action, 2, 2)
179   comm_size = parse_double(action[2]);
180   comp_size = parse_double(action[3]);
181   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
182   if (action.size() > 5)
183     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
184 }
185
186 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 1)
189   comm_size = parse_double(action[2]);
190   comp_size = parse_double(action[3]);
191   if (action.size() > 4)
192     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
193 }
194
195 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
196 {
197   CHECK_ACTION_PARAMS(action, 2, 1)
198   comm_size = MPI_COMM_WORLD->size();
199   send_size = parse_double(action[2]);
200   recv_size = parse_double(action[3]);
201
202   if (action.size() > 4)
203     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204   if (action.size() > 5)
205     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
206 }
207
208 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
209 {
210   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
211         0 gather 68 68 0 0 0
212       where:
213         1) 68 is the sendcounts
214         2) 68 is the recvcounts
215         3) 0 is the root node
216         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
217         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218   */
219   CHECK_ACTION_PARAMS(action, 2, 3)
220   comm_size = MPI_COMM_WORLD->size();
221   send_size = parse_double(action[2]);
222   recv_size = parse_double(action[3]);
223
224   if (name == "gather") {
225     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
226     if (action.size() > 5)
227       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
228     if (action.size() > 6)
229       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
230   } else {
231     if (action.size() > 4)
232       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
233     if (action.size() > 5)
234       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
235   }
236 }
237
238 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
239 {
240   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
241        0 gather 68 68 10 10 10 0 0 0
242      where:
243        1) 68 is the sendcount
244        2) 68 10 10 10 is the recvcounts
245        3) 0 is the root node
246        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
248   */
249   comm_size = MPI_COMM_WORLD->size();
250   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
251   send_size  = parse_double(action[2]);
252   disps      = std::vector<int>(comm_size, 0);
253   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
254
255   if (name == "gatherV") {
256     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
257     if (action.size() > 4 + comm_size)
258       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
259     if (action.size() > 5 + comm_size)
260       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
261   } else {
262     int datatype_index = 0;
263     int disp_index     = 0;
264     /* The 3 comes from "0 gather <sendcount>", which must always be present.
265      * The + comm_size is the recvcounts array, which must also be present
266      */
267     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
268       datatype_index = 3 + comm_size;
269       disp_index     = datatype_index + 1;
270       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
271       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
272     } else if (action.size() >
273                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
274       disp_index = 3 + comm_size;
275     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
276       datatype_index = 3 + comm_size;
277       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
278       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279     }
280
281     if (disp_index != 0) {
282       for (unsigned int i = 0; i < comm_size; i++)
283         disps[i]          = std::stoi(action[disp_index + i]);
284     }
285   }
286
287   for (unsigned int i = 0; i < comm_size; i++) {
288     (*recvcounts)[i] = std::stoi(action[i + 3]);
289   }
290   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
291 }
292
293 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
294 {
295   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
296         0 gather 68 68 0 0 0
297       where:
298         1) 68 is the sendcounts
299         2) 68 is the recvcounts
300         3) 0 is the root node
301         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
303   */
304   CHECK_ACTION_PARAMS(action, 2, 3)
305   comm_size = MPI_COMM_WORLD->size();
306   send_size = parse_double(action[2]);
307   recv_size = parse_double(action[3]);
308   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
309   if (action.size() > 5)
310     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
311   if (action.size() > 6)
312     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
313 }
314
315 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
316 {
317   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
318      0 gather 68 10 10 10 68 0 0 0
319       where:
320       1) 68 10 10 10 is the sendcounts
321       2) 68 is the recvcount
322       3) 0 is the root node
323       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
325   */
326   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
327   recv_size  = parse_double(action[2 + comm_size]);
328   disps      = std::vector<int>(comm_size, 0);
329   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
330
331   if (action.size() > 5 + comm_size)
332     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
333   if (action.size() > 5 + comm_size)
334     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
335
336   for (unsigned int i = 0; i < comm_size; i++) {
337     (*sendcounts)[i] = std::stoi(action[i + 2]);
338   }
339   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
340   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
341 }
342
343 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
344 {
345   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
346        0 reduceScatter 275427 275427 275427 204020 11346849 0
347      where:
348        1) The first four values after the name of the action declare the recvcounts array
349        2) The value 11346849 is the amount of instructions
350        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
351   */
352   comm_size = MPI_COMM_WORLD->size();
353   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
354   comp_size  = parse_double(action[2 + comm_size]);
355   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
356   if (action.size() > 3 + comm_size)
357     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
358
359   for (unsigned int i = 0; i < comm_size; i++) {
360     recvcounts->push_back(std::stoi(action[i + 2]));
361   }
362   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
363 }
364
365 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
366 {
367   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
368         0 allToAllV 100 1 7 10 12 100 1 70 10 5
369      where:
370       1) 100 is the size of the send buffer *sizeof(int),
371       2) 1 7 10 12 is the sendcounts array
372       3) 100*sizeof(int) is the size of the receiver buffer
373       4)  1 70 10 5 is the recvcounts array
374   */
375   comm_size = MPI_COMM_WORLD->size();
376   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
377   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379   senddisps  = std::vector<int>(comm_size, 0);
380   recvdisps  = std::vector<int>(comm_size, 0);
381
382   if (action.size() > 5 + 2 * comm_size)
383     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
384   if (action.size() > 5 + 2 * comm_size)
385     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
386
387   send_buf_size = parse_double(action[2]);
388   recv_buf_size = parse_double(action[3 + comm_size]);
389   for (unsigned int i = 0; i < comm_size; i++) {
390     (*sendcounts)[i] = std::stoi(action[3 + i]);
391     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
392   }
393   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
394   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
395 }
396
397 template<class T>
398 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
399 {
400   // Needs to be re-initialized for every action, hence here
401   double start_time = smpi_process()->simulated_elapsed();
402   args.parse(action, name);
403   kernel(action);
404   if (name != "Init")
405     log_timed_action(action, start_time);
406 }
407
408 class WaitAction : public ReplayAction<WaitTestParser> {
409 private:
410   RequestStorage& req_storage;
411
412 public:
413   explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait"), req_storage(storage) {}
414   void kernel(simgrid::xbt::ReplayAction& action) override
415   {
416     std::string s = boost::algorithm::join(action, " ");
417     xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
418     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
419     req_storage.remove(request);
420
421     if (request == MPI_REQUEST_NULL) {
422       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
423        * return.*/
424       return;
425     }
426
427     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
428
429     // Must be taken before Request::wait() since the request may be set to
430     // MPI_REQUEST_NULL by Request::wait!
431     bool is_wait_for_receive = (request->flags() & RECV);
432     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
433     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
434
435     MPI_Status status;
436     Request::wait(&request, &status);
437
438     TRACE_smpi_comm_out(rank);
439     if (is_wait_for_receive)
440       TRACE_smpi_recv(args.src, args.dst, args.tag);
441   }
442 };
443
444 class SendAction : public ReplayAction<SendRecvParser> {
445 private:
446   RequestStorage& req_storage;
447
448 public:
449   explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
450   void kernel(simgrid::xbt::ReplayAction& action) override
451   {
452     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
453
454     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
455                                                                              args.tag, Datatype::encode(args.datatype1)));
456     if (not TRACE_smpi_view_internals())
457       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
458
459     if (name == "send") {
460       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
461     } else if (name == "Isend") {
462       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463       req_storage.add(request);
464     } else {
465       xbt_die("Don't know this action, %s", name.c_str());
466     }
467
468     TRACE_smpi_comm_out(my_proc_id);
469   }
470 };
471
472 class RecvAction : public ReplayAction<SendRecvParser> {
473 private:
474   RequestStorage& req_storage;
475
476 public:
477   explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
478   void kernel(simgrid::xbt::ReplayAction& action) override
479   {
480     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
481
482     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
483                                                                              args.tag, Datatype::encode(args.datatype1)));
484
485     MPI_Status status;
486     // unknown size from the receiver point of view
487     if (args.size <= 0.0) {
488       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
489       args.size = status.count;
490     }
491
492     if (name == "recv") {
493       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
494     } else if (name == "Irecv") {
495       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
496       req_storage.add(request);
497     }
498
499     TRACE_smpi_comm_out(my_proc_id);
500     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
501     if (name == "recv" && not TRACE_smpi_view_internals()) {
502       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
503     }
504   }
505 };
506
507 class ComputeAction : public ReplayAction<ComputeParser> {
508 public:
509   ComputeAction() : ReplayAction("compute") {}
510   void kernel(simgrid::xbt::ReplayAction& action) override
511   {
512     TRACE_smpi_computing_in(my_proc_id, args.flops);
513     smpi_execute_flops(args.flops);
514     TRACE_smpi_computing_out(my_proc_id);
515   }
516 };
517
518 class TestAction : public ReplayAction<WaitTestParser> {
519 private:
520   RequestStorage& req_storage;
521
522 public:
523   explicit TestAction(RequestStorage& storage) : ReplayAction("Test"), req_storage(storage) {}
524   void kernel(simgrid::xbt::ReplayAction& action) override
525   {
526     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
527     req_storage.remove(request);
528     // if request is null here, this may mean that a previous test has succeeded
529     // Different times in traced application and replayed version may lead to this
530     // In this case, ignore the extra calls.
531     if (request != MPI_REQUEST_NULL) {
532       TRACE_smpi_testing_in(my_proc_id);
533
534       MPI_Status status;
535       int flag = Request::test(&request, &status);
536
537       XBT_DEBUG("MPI_Test result: %d", flag);
538       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
539        * nullptr.*/
540       if (request == MPI_REQUEST_NULL)
541         req_storage.addNullRequest(args.src, args.dst, args.tag);
542       else
543         req_storage.add(request);
544
545       TRACE_smpi_testing_out(my_proc_id);
546     }
547   }
548 };
549
550 class InitAction : public ReplayAction<ActionArgParser> {
551 public:
552   InitAction() : ReplayAction("Init") {}
553   void kernel(simgrid::xbt::ReplayAction& action) override
554   {
555     CHECK_ACTION_PARAMS(action, 0, 1)
556     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
557                                            : MPI_BYTE;  // default TAU datatype
558
559     /* start a simulated timer */
560     smpi_process()->simulated_start();
561   }
562 };
563
564 class CommunicatorAction : public ReplayAction<ActionArgParser> {
565 public:
566   CommunicatorAction() : ReplayAction("Comm") {}
567   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
568 };
569
570 class WaitAllAction : public ReplayAction<ActionArgParser> {
571 private:
572   RequestStorage& req_storage;
573
574 public:
575   explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll"), req_storage(storage) {}
576   void kernel(simgrid::xbt::ReplayAction& action) override
577   {
578     const unsigned int count_requests = req_storage.size();
579
580     if (count_requests > 0) {
581       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
582       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
583       std::vector<MPI_Request> reqs;
584       req_storage.get_requests(reqs);
585       for (const auto& req : reqs) {
586         if (req && (req->flags() & RECV)) {
587           sender_receiver.push_back({req->src(), req->dst()});
588         }
589       }
590       MPI_Status status[count_requests];
591       Request::waitall(count_requests, &(reqs.data())[0], status);
592       req_storage.get_store().clear();
593
594       for (auto& pair : sender_receiver) {
595         TRACE_smpi_recv(pair.first, pair.second, 0);
596       }
597       TRACE_smpi_comm_out(my_proc_id);
598     }
599   }
600 };
601
602 class BarrierAction : public ReplayAction<ActionArgParser> {
603 public:
604   BarrierAction() : ReplayAction("barrier") {}
605   void kernel(simgrid::xbt::ReplayAction& action) override
606   {
607     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
608     Colls::barrier(MPI_COMM_WORLD);
609     TRACE_smpi_comm_out(my_proc_id);
610   }
611 };
612
613 class BcastAction : public ReplayAction<BcastArgParser> {
614 public:
615   BcastAction() : ReplayAction("bcast") {}
616   void kernel(simgrid::xbt::ReplayAction& action) override
617   {
618     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
619                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
620                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
621
622     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
623
624     TRACE_smpi_comm_out(my_proc_id);
625   }
626 };
627
628 class ReduceAction : public ReplayAction<ReduceArgParser> {
629 public:
630   ReduceAction() : ReplayAction("reduce") {}
631   void kernel(simgrid::xbt::ReplayAction& action) override
632   {
633     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
634                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
635                                                       args.comp_size, args.comm_size, -1,
636                                                       Datatype::encode(args.datatype1), ""));
637
638     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
639         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
640     smpi_execute_flops(args.comp_size);
641
642     TRACE_smpi_comm_out(my_proc_id);
643   }
644 };
645
646 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
647 public:
648   AllReduceAction() : ReplayAction("allReduce") {}
649   void kernel(simgrid::xbt::ReplayAction& action) override
650   {
651     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
652                                                                                 Datatype::encode(args.datatype1), ""));
653
654     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
655         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
656     smpi_execute_flops(args.comp_size);
657
658     TRACE_smpi_comm_out(my_proc_id);
659   }
660 };
661
662 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
663 public:
664   AllToAllAction() : ReplayAction("allToAll") {}
665   void kernel(simgrid::xbt::ReplayAction& action) override
666   {
667     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
668                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
669                                                     Datatype::encode(args.datatype1),
670                                                     Datatype::encode(args.datatype2)));
671
672     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
673                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
674                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
675
676     TRACE_smpi_comm_out(my_proc_id);
677   }
678 };
679
680 class GatherAction : public ReplayAction<GatherArgParser> {
681 public:
682   explicit GatherAction(std::string name) : ReplayAction(name) {}
683   void kernel(simgrid::xbt::ReplayAction& action) override
684   {
685     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
686                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
687
688     if (name == "gather") {
689       int rank = MPI_COMM_WORLD->rank();
690       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
691                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
692     }
693     else
694       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
696
697     TRACE_smpi_comm_out(my_proc_id);
698   }
699 };
700
701 class GatherVAction : public ReplayAction<GatherVArgParser> {
702 public:
703   explicit GatherVAction(std::string name) : ReplayAction(name) {}
704   void kernel(simgrid::xbt::ReplayAction& action) override
705   {
706     int rank = MPI_COMM_WORLD->rank();
707
708     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
709                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
710                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
711
712     if (name == "gatherV") {
713       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
714                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
715                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
716     }
717     else {
718       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
719                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
720                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
721     }
722
723     TRACE_smpi_comm_out(my_proc_id);
724   }
725 };
726
727 class ScatterAction : public ReplayAction<ScatterArgParser> {
728 public:
729   ScatterAction() : ReplayAction("scatter") {}
730   void kernel(simgrid::xbt::ReplayAction& action) override
731   {
732     int rank = MPI_COMM_WORLD->rank();
733     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
734                                                                           Datatype::encode(args.datatype1),
735                                                                           Datatype::encode(args.datatype2)));
736
737     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
738                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
739
740     TRACE_smpi_comm_out(my_proc_id);
741   }
742 };
743
744
745 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
746 public:
747   ScatterVAction() : ReplayAction("scatterV") {}
748   void kernel(simgrid::xbt::ReplayAction& action) override
749   {
750     int rank = MPI_COMM_WORLD->rank();
751     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
752           nullptr, Datatype::encode(args.datatype1),
753           Datatype::encode(args.datatype2)));
754
755     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
756                     args.sendcounts->data(), args.disps.data(), args.datatype1,
757                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
758                     MPI_COMM_WORLD);
759
760     TRACE_smpi_comm_out(my_proc_id);
761   }
762 };
763
764 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
765 public:
766   ReduceScatterAction() : ReplayAction("reduceScatter") {}
767   void kernel(simgrid::xbt::ReplayAction& action) override
768   {
769     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
770                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
771                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
772                                                          Datatype::encode(args.datatype1)));
773
774     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
775                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
776                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
777
778     smpi_execute_flops(args.comp_size);
779     TRACE_smpi_comm_out(my_proc_id);
780   }
781 };
782
783 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
784 public:
785   AllToAllVAction() : ReplayAction("allToAllV") {}
786   void kernel(simgrid::xbt::ReplayAction& action) override
787   {
788     TRACE_smpi_comm_in(my_proc_id, __func__,
789                        new simgrid::instr::VarCollTIData(
790                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
791                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
792
793     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
794                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
795
796     TRACE_smpi_comm_out(my_proc_id);
797   }
798 };
799 } // Replay Namespace
800 }} // namespace simgrid::smpi
801
802 std::vector<simgrid::smpi::replay::RequestStorage> storage;
803 /** @brief Only initialize the replay, don't do it for real */
804 void smpi_replay_init(int* argc, char*** argv)
805 {
806   simgrid::smpi::Process::init(argc, argv);
807   smpi_process()->mark_as_initialized();
808   smpi_process()->set_replaying(true);
809
810   int my_proc_id = simgrid::s4u::this_actor::get_pid();
811   storage.resize(smpi_process_count());
812
813   TRACE_smpi_init(my_proc_id);
814   TRACE_smpi_computing_init(my_proc_id);
815   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
816   TRACE_smpi_comm_out(my_proc_id);
817   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
818   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
819   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
820   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
821   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
823   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
824   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
825   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
826   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
827   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
828   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
829   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
830   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
831   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
832   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
833   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
834   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
835   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
836   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
837   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
838   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
839   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
840   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
841   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
842   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
843
844   //if we have a delayed start, sleep here.
845   if(*argc>2){
846     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
847     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
848     smpi_execute_flops(value);
849   } else {
850     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
851     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
852     smpi_execute_flops(0.0);
853   }
854 }
855
856 /** @brief actually run the replay after initialization */
857 void smpi_replay_main(int* argc, char*** argv)
858 {
859   static int active_processes = 0;
860   active_processes++;
861   simgrid::xbt::replay_runner(*argc, *argv);
862
863   /* and now, finalize everything */
864   /* One active process will stop. Decrease the counter*/
865   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
866   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
867   if (count_requests > 0) {
868     MPI_Request requests[count_requests];
869     MPI_Status status[count_requests];
870     unsigned int i=0;
871
872     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
873       requests[i] = pair.second;
874       i++;
875     }
876     simgrid::smpi::Request::waitall(count_requests, requests, status);
877   }
878   active_processes--;
879
880   if(active_processes==0){
881     /* Last process alive speaking: end the simulated timer */
882     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
883     smpi_free_replay_tmp_buffers();
884   }
885
886   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
887                      new simgrid::instr::NoOpTIData("finalize"));
888
889   smpi_process()->finalize();
890
891   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
892   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
893 }
894
895 /** @brief chain a replay initialization and a replay start */
896 void smpi_replay_run(int* argc, char*** argv)
897 {
898   smpi_replay_init(argc, argv);
899   smpi_replay_main(argc, argv);
900 }