Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7fc301d672ace8f3e8da4c72fb6661fee592ef23
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
15
16 #include <boost/algorithm/string/join.hpp>
17 #include <memory>
18 #include <numeric>
19 #include <unordered_map>
20 #include <vector>
21
22 #include <tuple>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
28 public:
29   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
30 };
31
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
33 {
34   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 }
36
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
39 public:
40   static void apply(size_t& seed, Tuple const& tuple)
41   {
42     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43     hash_combine(seed, std::get<Index>(tuple));
44   }
45 };
46
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
48 public:
49   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
50 };
51
52 template <typename... TT> class hash<std::tuple<TT...>> {
53 public:
54   size_t operator()(std::tuple<TT...> const& tt) const
55   {
56     size_t seed = 0;
57     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
58     return seed;
59   }
60 };
61 }
62
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
64
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67
68
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
70 {
71   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72     std::string s = boost::algorithm::join(action, " ");
73     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74   }
75 }
76
77 /* Helper function */
78 static double parse_double(std::string string)
79 {
80   return xbt_str_parse_double(string.c_str(), "%s is not a double");
81 }
82
83 namespace simgrid {
84 namespace smpi {
85
86 namespace replay {
87 MPI_Datatype MPI_DEFAULT_TYPE;
88
89 class RequestStorage {
90 private:
91     req_storage_t store;
92
93 public:
94     RequestStorage() {}
95     int size()
96     {
97       return store.size();
98     }
99
100     req_storage_t& get_store()
101     {
102       return store;
103     }
104
105     void get_requests(std::vector<MPI_Request>& vec)
106     {
107       for (auto& pair : store) {
108         auto& req = pair.second;
109         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111           vec.push_back(pair.second);
112           pair.second->print_request("MM");
113         }
114       }
115     }
116
117     MPI_Request find(int src, int dst, int tag)
118     {
119       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121     }
122
123     void remove(MPI_Request req)
124     {
125       if (req == MPI_REQUEST_NULL) return;
126
127       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128     }
129
130     void add(MPI_Request req)
131     {
132       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134     }
135
136     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137     void addNullRequest(int src, int dst, int tag)
138     {
139       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
140     }
141 };
142
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
144 {
145   CHECK_ACTION_PARAMS(action, 3, 0)
146   src = std::stoi(action[2]);
147   dst = std::stoi(action[3]);
148   tag = std::stoi(action[4]);
149 }
150
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
152 {
153   CHECK_ACTION_PARAMS(action, 3, 1)
154   partner = std::stoi(action[2]);
155   tag     = std::stoi(action[3]);
156   size    = parse_double(action[4]);
157   if (action.size() > 5)
158     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
159 }
160
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 2)
171   size = parse_double(action[2]);
172   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173   if (action.size() > 4)
174     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
175 }
176
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
178 {
179   CHECK_ACTION_PARAMS(action, 2, 2)
180   comm_size = parse_double(action[2]);
181   comp_size = parse_double(action[3]);
182   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
183   if (action.size() > 5)
184     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
185 }
186
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 1)
190   comm_size = parse_double(action[2]);
191   comp_size = parse_double(action[3]);
192   if (action.size() > 4)
193     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
194 }
195
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
197 {
198   CHECK_ACTION_PARAMS(action, 2, 1)
199   comm_size = MPI_COMM_WORLD->size();
200   send_size = parse_double(action[2]);
201   recv_size = parse_double(action[3]);
202
203   if (action.size() > 4)
204     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205   if (action.size() > 5)
206     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
207 }
208
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
210 {
211   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
212         0 gather 68 68 0 0 0
213       where:
214         1) 68 is the sendcounts
215         2) 68 is the recvcounts
216         3) 0 is the root node
217         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219   */
220   CHECK_ACTION_PARAMS(action, 2, 3)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_double(action[2]);
223   recv_size = parse_double(action[3]);
224
225   if (name == "gather") {
226     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
227     if (action.size() > 5)
228       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229     if (action.size() > 6)
230       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231   }
232   else {
233     if (action.size() > 4)
234       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
235     if (action.size() > 5)
236       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
237   }
238 }
239
240 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
241 {
242   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
243        0 gather 68 68 10 10 10 0 0 0
244      where:
245        1) 68 is the sendcount
246        2) 68 10 10 10 is the recvcounts
247        3) 0 is the root node
248        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
249        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250   */
251   comm_size = MPI_COMM_WORLD->size();
252   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
253   send_size = parse_double(action[2]);
254   disps     = std::vector<int>(comm_size, 0);
255   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256
257   if (name == "gatherV") {
258     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
259     if (action.size() > 4 + comm_size)
260       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
261     if (action.size() > 5 + comm_size)
262       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263   }
264   else {
265     int datatype_index = 0;
266     int disp_index     = 0;
267     /* The 3 comes from "0 gather <sendcount>", which must always be present.
268      * The + comm_size is the recvcounts array, which must also be present
269      */
270     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
271       datatype_index = 3 + comm_size;
272       disp_index     = datatype_index + 1;
273       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
274       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
275     } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
276       disp_index     = 3 + comm_size;
277     } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
278       datatype_index = 3 + comm_size;
279       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
280       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
281     }
282
283     if (disp_index != 0) {
284       for (unsigned int i = 0; i < comm_size; i++)
285         disps[i]          = std::stoi(action[disp_index + i]);
286     }
287   }
288
289   for (unsigned int i = 0; i < comm_size; i++) {
290     (*recvcounts)[i] = std::stoi(action[i + 3]);
291   }
292   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
293 }
294
295 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
296 {
297   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
298         0 gather 68 68 0 0 0
299       where:
300         1) 68 is the sendcounts
301         2) 68 is the recvcounts
302         3) 0 is the root node
303         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
304         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305   */
306   CHECK_ACTION_PARAMS(action, 2, 3)
307   comm_size   = MPI_COMM_WORLD->size();
308   send_size   = parse_double(action[2]);
309   recv_size   = parse_double(action[3]);
310   root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
311   if (action.size() > 5)
312     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
313   if (action.size() > 6)
314     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
315 }
316
317 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
318 {
319   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
320      0 gather 68 10 10 10 68 0 0 0
321       where:
322       1) 68 10 10 10 is the sendcounts
323       2) 68 is the recvcount
324       3) 0 is the root node
325       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
326       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327   */
328   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
329   recv_size  = parse_double(action[2 + comm_size]);
330   disps      = std::vector<int>(comm_size, 0);
331   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332
333   if (action.size() > 5 + comm_size)
334     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
335   if (action.size() > 5 + comm_size)
336     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337
338   for (unsigned int i = 0; i < comm_size; i++) {
339     (*sendcounts)[i] = std::stoi(action[i + 2]);
340   }
341   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
342   root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
343 }
344
345 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
346 {
347   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
348        0 reduceScatter 275427 275427 275427 204020 11346849 0
349      where:
350        1) The first four values after the name of the action declare the recvcounts array
351        2) The value 11346849 is the amount of instructions
352        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353   */
354   comm_size = MPI_COMM_WORLD->size();
355   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
356   comp_size = parse_double(action[2+comm_size]);
357   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
358   if (action.size() > 3 + comm_size)
359     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360
361   for (unsigned int i = 0; i < comm_size; i++) {
362     recvcounts->push_back(std::stoi(action[i + 2]));
363   }
364   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
365 }
366
367 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
368 {
369   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
370         0 allToAllV 100 1 7 10 12 100 1 70 10 5
371      where:
372       1) 100 is the size of the send buffer *sizeof(int),
373       2) 1 7 10 12 is the sendcounts array
374       3) 100*sizeof(int) is the size of the receiver buffer
375       4)  1 70 10 5 is the recvcounts array
376   */
377   comm_size = MPI_COMM_WORLD->size();
378   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
379   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
381   senddisps  = std::vector<int>(comm_size, 0);
382   recvdisps  = std::vector<int>(comm_size, 0);
383
384   if (action.size() > 5 + 2 * comm_size)
385     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
386   if (action.size() > 5 + 2 * comm_size)
387     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388
389   send_buf_size=parse_double(action[2]);
390   recv_buf_size=parse_double(action[3+comm_size]);
391   for (unsigned int i = 0; i < comm_size; i++) {
392     (*sendcounts)[i] = std::stoi(action[3 + i]);
393     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394   }
395   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
396   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
397 }
398
399 template<class T>
400 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
401 {
402   // Needs to be re-initialized for every action, hence here
403   double start_time = smpi_process()->simulated_elapsed();
404   args.parse(action, name);
405   kernel(action);
406   if (name != "Init")
407     log_timed_action(action, start_time);
408 }
409
410 class WaitAction : public ReplayAction<WaitTestParser> {
411 private:
412   RequestStorage& req_storage;
413
414 public:
415   explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait"), req_storage(storage) {}
416   void kernel(simgrid::xbt::ReplayAction& action) override
417   {
418     std::string s = boost::algorithm::join(action, " ");
419     xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
420     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
421     req_storage.remove(request);
422
423     if (request == MPI_REQUEST_NULL) {
424       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
425        * return.*/
426       return;
427     }
428
429     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
430
431     // Must be taken before Request::wait() since the request may be set to
432     // MPI_REQUEST_NULL by Request::wait!
433     bool is_wait_for_receive = (request->flags() & RECV);
434     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
435     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
436
437     MPI_Status status;
438     Request::wait(&request, &status);
439
440     TRACE_smpi_comm_out(rank);
441     if (is_wait_for_receive)
442       TRACE_smpi_recv(args.src, args.dst, args.tag);
443   }
444 };
445
446 class SendAction : public ReplayAction<SendRecvParser> {
447 private:
448   RequestStorage& req_storage;
449
450 public:
451   explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
452   void kernel(simgrid::xbt::ReplayAction& action) override
453   {
454     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
455
456     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
457                                                                              args.tag, Datatype::encode(args.datatype1)));
458     if (not TRACE_smpi_view_internals())
459       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
460
461     if (name == "send") {
462       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463     } else if (name == "Isend") {
464       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
465       req_storage.add(request);
466     } else {
467       xbt_die("Don't know this action, %s", name.c_str());
468     }
469
470     TRACE_smpi_comm_out(my_proc_id);
471   }
472 };
473
474 class RecvAction : public ReplayAction<SendRecvParser> {
475 private:
476   RequestStorage& req_storage;
477
478 public:
479   explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name), req_storage(storage) {}
480   void kernel(simgrid::xbt::ReplayAction& action) override
481   {
482     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
483
484     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
485                                                                              args.tag, Datatype::encode(args.datatype1)));
486
487     MPI_Status status;
488     // unknown size from the receiver point of view
489     if (args.size <= 0.0) {
490       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
491       args.size = status.count;
492     }
493
494     if (name == "recv") {
495       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
496     } else if (name == "Irecv") {
497       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
498       req_storage.add(request);
499     }
500
501     TRACE_smpi_comm_out(my_proc_id);
502     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
503     if (name == "recv" && not TRACE_smpi_view_internals()) {
504       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
505     }
506   }
507 };
508
509 class ComputeAction : public ReplayAction<ComputeParser> {
510 public:
511   ComputeAction() : ReplayAction("compute") {}
512   void kernel(simgrid::xbt::ReplayAction& action) override
513   {
514     TRACE_smpi_computing_in(my_proc_id, args.flops);
515     smpi_execute_flops(args.flops);
516     TRACE_smpi_computing_out(my_proc_id);
517   }
518 };
519
520 class TestAction : public ReplayAction<WaitTestParser> {
521 private:
522   RequestStorage& req_storage;
523
524 public:
525   explicit TestAction(RequestStorage& storage) : ReplayAction("Test"), req_storage(storage) {}
526   void kernel(simgrid::xbt::ReplayAction& action) override
527   {
528     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
529     req_storage.remove(request);
530     // if request is null here, this may mean that a previous test has succeeded
531     // Different times in traced application and replayed version may lead to this
532     // In this case, ignore the extra calls.
533     if (request != MPI_REQUEST_NULL) {
534       TRACE_smpi_testing_in(my_proc_id);
535
536       MPI_Status status;
537       int flag = Request::test(&request, &status);
538
539       XBT_DEBUG("MPI_Test result: %d", flag);
540       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
541        * nullptr.*/
542       if (request == MPI_REQUEST_NULL)
543         req_storage.addNullRequest(args.src, args.dst, args.tag);
544       else
545         req_storage.add(request);
546
547       TRACE_smpi_testing_out(my_proc_id);
548     }
549   }
550 };
551
552 class InitAction : public ReplayAction<ActionArgParser> {
553 public:
554   InitAction() : ReplayAction("Init") {}
555   void kernel(simgrid::xbt::ReplayAction& action) override
556   {
557     CHECK_ACTION_PARAMS(action, 0, 1)
558     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
559                                            : MPI_BYTE;  // default TAU datatype
560
561     /* start a simulated timer */
562     smpi_process()->simulated_start();
563   }
564 };
565
566 class CommunicatorAction : public ReplayAction<ActionArgParser> {
567 public:
568   CommunicatorAction() : ReplayAction("Comm") {}
569   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
570 };
571
572 class WaitAllAction : public ReplayAction<ActionArgParser> {
573 private:
574   RequestStorage& req_storage;
575
576 public:
577   explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll"), req_storage(storage) {}
578   void kernel(simgrid::xbt::ReplayAction& action) override
579   {
580     const unsigned int count_requests = req_storage.size();
581
582     if (count_requests > 0) {
583       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
584       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
585       std::vector<MPI_Request> reqs;
586       req_storage.get_requests(reqs);
587       for (const auto& req : reqs) {
588         if (req && (req->flags() & RECV)) {
589           sender_receiver.push_back({req->src(), req->dst()});
590         }
591       }
592       MPI_Status status[count_requests];
593       Request::waitall(count_requests, &(reqs.data())[0], status);
594       req_storage.get_store().clear();
595
596       for (auto& pair : sender_receiver) {
597         TRACE_smpi_recv(pair.first, pair.second, 0);
598       }
599       TRACE_smpi_comm_out(my_proc_id);
600     }
601   }
602 };
603
604 class BarrierAction : public ReplayAction<ActionArgParser> {
605 public:
606   BarrierAction() : ReplayAction("barrier") {}
607   void kernel(simgrid::xbt::ReplayAction& action) override
608   {
609     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
610     Colls::barrier(MPI_COMM_WORLD);
611     TRACE_smpi_comm_out(my_proc_id);
612   }
613 };
614
615 class BcastAction : public ReplayAction<BcastArgParser> {
616 public:
617   BcastAction() : ReplayAction("bcast") {}
618   void kernel(simgrid::xbt::ReplayAction& action) override
619   {
620     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
621                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
622                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
623
624     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
625
626     TRACE_smpi_comm_out(my_proc_id);
627   }
628 };
629
630 class ReduceAction : public ReplayAction<ReduceArgParser> {
631 public:
632   ReduceAction() : ReplayAction("reduce") {}
633   void kernel(simgrid::xbt::ReplayAction& action) override
634   {
635     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
636                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
637                                                       args.comp_size, args.comm_size, -1,
638                                                       Datatype::encode(args.datatype1), ""));
639
640     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
641         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
642     smpi_execute_flops(args.comp_size);
643
644     TRACE_smpi_comm_out(my_proc_id);
645   }
646 };
647
648 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
649 public:
650   AllReduceAction() : ReplayAction("allReduce") {}
651   void kernel(simgrid::xbt::ReplayAction& action) override
652   {
653     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
654                                                                                 Datatype::encode(args.datatype1), ""));
655
656     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
657         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
658     smpi_execute_flops(args.comp_size);
659
660     TRACE_smpi_comm_out(my_proc_id);
661   }
662 };
663
664 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
665 public:
666   AllToAllAction() : ReplayAction("allToAll") {}
667   void kernel(simgrid::xbt::ReplayAction& action) override
668   {
669     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
670                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
671                                                     Datatype::encode(args.datatype1),
672                                                     Datatype::encode(args.datatype2)));
673
674     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
675                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
676                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
677
678     TRACE_smpi_comm_out(my_proc_id);
679   }
680 };
681
682 class GatherAction : public ReplayAction<GatherArgParser> {
683 public:
684   explicit GatherAction(std::string name) : ReplayAction(name) {}
685   void kernel(simgrid::xbt::ReplayAction& action) override
686   {
687     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
688                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
689
690     if (name == "gather") {
691       int rank = MPI_COMM_WORLD->rank();
692       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
693                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
694     }
695     else
696       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
697                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
698
699     TRACE_smpi_comm_out(my_proc_id);
700   }
701 };
702
703 class GatherVAction : public ReplayAction<GatherVArgParser> {
704 public:
705   explicit GatherVAction(std::string name) : ReplayAction(name) {}
706   void kernel(simgrid::xbt::ReplayAction& action) override
707   {
708     int rank = MPI_COMM_WORLD->rank();
709
710     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
711                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
712                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
713
714     if (name == "gatherV") {
715       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
716                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
717                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
718     }
719     else {
720       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
721                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
722                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
723     }
724
725     TRACE_smpi_comm_out(my_proc_id);
726   }
727 };
728
729 class ScatterAction : public ReplayAction<ScatterArgParser> {
730 public:
731   ScatterAction() : ReplayAction("scatter") {}
732   void kernel(simgrid::xbt::ReplayAction& action) override
733   {
734     int rank = MPI_COMM_WORLD->rank();
735     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
736                                                                           Datatype::encode(args.datatype1),
737                                                                           Datatype::encode(args.datatype2)));
738
739     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
740                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
741
742     TRACE_smpi_comm_out(my_proc_id);
743   }
744 };
745
746
747 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
748 public:
749   ScatterVAction() : ReplayAction("scatterV") {}
750   void kernel(simgrid::xbt::ReplayAction& action) override
751   {
752     int rank = MPI_COMM_WORLD->rank();
753     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
754           nullptr, Datatype::encode(args.datatype1),
755           Datatype::encode(args.datatype2)));
756
757     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
758                     args.sendcounts->data(), args.disps.data(), args.datatype1,
759                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
760                     MPI_COMM_WORLD);
761
762     TRACE_smpi_comm_out(my_proc_id);
763   }
764 };
765
766 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
767 public:
768   ReduceScatterAction() : ReplayAction("reduceScatter") {}
769   void kernel(simgrid::xbt::ReplayAction& action) override
770   {
771     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
772                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
773                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
774                                                          Datatype::encode(args.datatype1)));
775
776     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
777                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
778                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
779
780     smpi_execute_flops(args.comp_size);
781     TRACE_smpi_comm_out(my_proc_id);
782   }
783 };
784
785 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
786 public:
787   AllToAllVAction() : ReplayAction("allToAllV") {}
788   void kernel(simgrid::xbt::ReplayAction& action) override
789   {
790     TRACE_smpi_comm_in(my_proc_id, __func__,
791                        new simgrid::instr::VarCollTIData(
792                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
793                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
794
795     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
796                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
797
798     TRACE_smpi_comm_out(my_proc_id);
799   }
800 };
801 } // Replay Namespace
802 }} // namespace simgrid::smpi
803
804 std::vector<simgrid::smpi::replay::RequestStorage> storage;
805 /** @brief Only initialize the replay, don't do it for real */
806 void smpi_replay_init(int* argc, char*** argv)
807 {
808   simgrid::smpi::Process::init(argc, argv);
809   smpi_process()->mark_as_initialized();
810   smpi_process()->set_replaying(true);
811
812   int my_proc_id = simgrid::s4u::this_actor::get_pid();
813   storage.resize(smpi_process_count());
814
815   TRACE_smpi_init(my_proc_id);
816   TRACE_smpi_computing_init(my_proc_id);
817   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
818   TRACE_smpi_comm_out(my_proc_id);
819   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
820   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
821   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
822   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
823   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
824   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
825   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
826   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
827   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
828   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
829   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
830   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
831   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
832   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
833   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
834   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
835   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
836   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
837   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
838   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
839   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
840   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
841   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
842   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
843   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
844   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
845
846   //if we have a delayed start, sleep here.
847   if(*argc>2){
848     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
849     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
850     smpi_execute_flops(value);
851   } else {
852     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
853     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
854     smpi_execute_flops(0.0);
855   }
856 }
857
858 /** @brief actually run the replay after initialization */
859 void smpi_replay_main(int* argc, char*** argv)
860 {
861   static int active_processes = 0;
862   active_processes++;
863   simgrid::xbt::replay_runner(*argc, *argv);
864
865   /* and now, finalize everything */
866   /* One active process will stop. Decrease the counter*/
867   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
868   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
869   if (count_requests > 0) {
870     MPI_Request requests[count_requests];
871     MPI_Status status[count_requests];
872     unsigned int i=0;
873
874     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
875       requests[i] = pair.second;
876       i++;
877     }
878     simgrid::smpi::Request::waitall(count_requests, requests, status);
879   }
880   active_processes--;
881
882   if(active_processes==0){
883     /* Last process alive speaking: end the simulated timer */
884     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
885     smpi_free_replay_tmp_buffers();
886   }
887
888   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
889                      new simgrid::instr::NoOpTIData("finalize"));
890
891   smpi_process()->finalize();
892
893   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
894   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
895 }
896
897 /** @brief chain a replay initialization and a replay start */
898 void smpi_replay_run(int* argc, char*** argv)
899 {
900   smpi_replay_init(argc, argv);
901   smpi_replay_main(argc, argv);
902 }