Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e9ad9140fc308cd4d1b9f5de447a4706b3e31df3
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <sstream>
20 #include <vector>
21
22 #include <tuple>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
28 public:
29   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
30 };
31
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
33 {
34   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 }
36
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
39 public:
40   static void apply(size_t& seed, Tuple const& tuple)
41   {
42     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43     hash_combine(seed, std::get<Index>(tuple));
44   }
45 };
46
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
48 public:
49   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
50 };
51
52 template <typename... TT> class hash<std::tuple<TT...>> {
53 public:
54   size_t operator()(std::tuple<TT...> const& tt) const
55   {
56     size_t seed = 0;
57     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
58     return seed;
59   }
60 };
61 }
62
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
64
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67
68 static MPI_Datatype MPI_DEFAULT_TYPE;
69
70 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
71   {                                                                                                                    \
72     if (action.size() < static_cast<unsigned long>(mandatory + 2)) {                                                   \
73       std::stringstream ss;                                                                                            \
74       for (const auto& elem : action) {                                                                                \
75         ss << elem << " ";                                                                                             \
76       }                                                                                                                \
77       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
78                            "%zu items were given on the line. First two should be process_id and action.  "            \
79                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
80                            "The full line that was given is:\n   %s\n"                                                 \
81                            "Please contact the Simgrid team if support is needed",                                     \
82              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional),     \
83              ss.str().c_str());                                                                                        \
84     }                                                                                                                  \
85   }
86
87 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
88 {
89   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
90     std::string s = boost::algorithm::join(action, " ");
91     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
92   }
93 }
94
95 /* Helper function */
96 static double parse_double(std::string string)
97 {
98   return xbt_str_parse_double(string.c_str(), "%s is not a double");
99 }
100
101 namespace simgrid {
102 namespace smpi {
103
104 namespace replay {
105
106 class RequestStorage {
107 private:
108     req_storage_t store;
109
110 public:
111     RequestStorage() {}
112     int size()
113     {
114       return store.size();
115     }
116
117     req_storage_t& get_store()
118     {
119       return store;
120     }
121
122     void get_requests(std::vector<MPI_Request>& vec)
123     {
124       for (auto& pair : store) {
125         auto& req = pair.second;
126         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
127         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
128           vec.push_back(pair.second);
129           pair.second->print_request("MM");
130         }
131       }
132     }
133
134     MPI_Request find(int src, int dst, int tag)
135     {
136       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
137       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
138     }
139
140     void remove(MPI_Request req)
141     {
142       if (req == MPI_REQUEST_NULL) return;
143
144       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
145     }
146
147     void add(MPI_Request req)
148     {
149       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
150         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
151     }
152
153     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
154     void addNullRequest(int src, int dst, int tag)
155     {
156       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
157     }
158 };
159
160 class ActionArgParser {
161 public:
162   virtual ~ActionArgParser() = default;
163   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
164 };
165
166 class WaitTestParser : public ActionArgParser {
167 public:
168   int src;
169   int dst;
170   int tag;
171
172   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
173   {
174     CHECK_ACTION_PARAMS(action, 3, 0)
175     src = std::stoi(action[2]);
176     dst = std::stoi(action[3]);
177     tag = std::stoi(action[4]);
178   }
179 };
180
181 class SendRecvParser : public ActionArgParser {
182 public:
183   /* communication partner; if we send, this is the receiver and vice versa */
184   int partner;
185   double size;
186   int tag;
187   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
188
189   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
190   {
191     CHECK_ACTION_PARAMS(action, 3, 1)
192     partner = std::stoi(action[2]);
193     tag     = std::stoi(action[3]);
194     size    = parse_double(action[4]);
195     if (action.size() > 5)
196       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
197   }
198 };
199
200 class ComputeParser : public ActionArgParser {
201 public:
202   /* communication partner; if we send, this is the receiver and vice versa */
203   double flops;
204
205   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
206   {
207     CHECK_ACTION_PARAMS(action, 1, 0)
208     flops = parse_double(action[2]);
209   }
210 };
211
212 class CollCommParser : public ActionArgParser {
213 public:
214   double size;
215   double comm_size;
216   double comp_size;
217   int send_size;
218   int recv_size;
219   int root = 0;
220   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
221   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
222 };
223
224 class BcastArgParser : public CollCommParser {
225 public:
226   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
227   {
228     CHECK_ACTION_PARAMS(action, 1, 2)
229     size = parse_double(action[2]);
230     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
231     if (action.size() > 4)
232       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
233   }
234 };
235
236 class ReduceArgParser : public CollCommParser {
237 public:
238   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
239   {
240     CHECK_ACTION_PARAMS(action, 2, 2)
241     comm_size = parse_double(action[2]);
242     comp_size = parse_double(action[3]);
243     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
244     if (action.size() > 5)
245       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
246   }
247 };
248
249 class AllReduceArgParser : public CollCommParser {
250 public:
251   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
252   {
253     CHECK_ACTION_PARAMS(action, 2, 1)
254     comm_size = parse_double(action[2]);
255     comp_size = parse_double(action[3]);
256     if (action.size() > 4)
257       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
258   }
259 };
260
261 class AllToAllArgParser : public CollCommParser {
262 public:
263   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
264   {
265     CHECK_ACTION_PARAMS(action, 2, 1)
266     comm_size = MPI_COMM_WORLD->size();
267     send_size = parse_double(action[2]);
268     recv_size = parse_double(action[3]);
269
270     if (action.size() > 4)
271       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
272     if (action.size() > 5)
273       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
274   }
275 };
276
277 class GatherArgParser : public CollCommParser {
278 public:
279   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
280   {
281     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
282           0 gather 68 68 0 0 0
283         where:
284           1) 68 is the sendcounts
285           2) 68 is the recvcounts
286           3) 0 is the root node
287           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
288           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
289     */
290     CHECK_ACTION_PARAMS(action, 2, 3)
291     comm_size = MPI_COMM_WORLD->size();
292     send_size = parse_double(action[2]);
293     recv_size = parse_double(action[3]);
294
295     if (name == "gather") {
296       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
297       if (action.size() > 5)
298         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
299       if (action.size() > 6)
300         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
301     }
302     else {
303       if (action.size() > 4)
304         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
305       if (action.size() > 5)
306         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
307     }
308   }
309 };
310
311 class GatherVArgParser : public CollCommParser {
312 public:
313   int recv_size_sum;
314   std::shared_ptr<std::vector<int>> recvcounts;
315   std::vector<int> disps;
316   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
317   {
318     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
319          0 gather 68 68 10 10 10 0 0 0
320        where:
321          1) 68 is the sendcount
322          2) 68 10 10 10 is the recvcounts
323          3) 0 is the root node
324          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326     */
327     comm_size = MPI_COMM_WORLD->size();
328     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
329     send_size = parse_double(action[2]);
330     disps     = std::vector<int>(comm_size, 0);
331     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332
333     if (name == "gatherV") {
334       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
335       if (action.size() > 4 + comm_size)
336         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
337       if (action.size() > 5 + comm_size)
338         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
339     }
340     else {
341       int datatype_index = 0;
342       int disp_index     = 0;
343       /* The 3 comes from "0 gather <sendcount>", which must always be present.
344        * The + comm_size is the recvcounts array, which must also be present
345        */
346       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
347         datatype_index = 3 + comm_size;
348         disp_index     = datatype_index + 1;
349         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
350         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
351       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
352         disp_index     = 3 + comm_size;
353       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
354         datatype_index = 3 + comm_size;
355         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
356         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
357       }
358
359       if (disp_index != 0) {
360         for (unsigned int i = 0; i < comm_size; i++)
361           disps[i]          = std::stoi(action[disp_index + i]);
362       }
363     }
364
365     for (unsigned int i = 0; i < comm_size; i++) {
366       (*recvcounts)[i] = std::stoi(action[i + 3]);
367     }
368     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
369   }
370 };
371
372 class ScatterArgParser : public CollCommParser {
373 public:
374   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
375   {
376     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
377           0 gather 68 68 0 0 0
378         where:
379           1) 68 is the sendcounts
380           2) 68 is the recvcounts
381           3) 0 is the root node
382           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
383           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
384     */
385     CHECK_ACTION_PARAMS(action, 2, 3)
386     comm_size   = MPI_COMM_WORLD->size();
387     send_size   = parse_double(action[2]);
388     recv_size   = parse_double(action[3]);
389     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
390     if (action.size() > 5)
391       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
392     if (action.size() > 6)
393       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
394   }
395 };
396
397 class ScatterVArgParser : public CollCommParser {
398 public:
399   int recv_size_sum;
400   int send_size_sum;
401   std::shared_ptr<std::vector<int>> sendcounts;
402   std::vector<int> disps;
403   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
404   {
405     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
406        0 gather 68 10 10 10 68 0 0 0
407         where:
408         1) 68 10 10 10 is the sendcounts
409         2) 68 is the recvcount
410         3) 0 is the root node
411         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
412         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
413     */
414     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
415     recv_size  = parse_double(action[2 + comm_size]);
416     disps      = std::vector<int>(comm_size, 0);
417     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
418
419     if (action.size() > 5 + comm_size)
420       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
421     if (action.size() > 5 + comm_size)
422       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
423
424     for (unsigned int i = 0; i < comm_size; i++) {
425       (*sendcounts)[i] = std::stoi(action[i + 2]);
426     }
427     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
428     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
429   }
430 };
431
432 class ReduceScatterArgParser : public CollCommParser {
433 public:
434   int recv_size_sum;
435   std::shared_ptr<std::vector<int>> recvcounts;
436   std::vector<int> disps;
437   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
438   {
439     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
440          0 reduceScatter 275427 275427 275427 204020 11346849 0
441        where:
442          1) The first four values after the name of the action declare the recvcounts array
443          2) The value 11346849 is the amount of instructions
444          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
445     */
446     comm_size = MPI_COMM_WORLD->size();
447     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
448     comp_size = parse_double(action[2+comm_size]);
449     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
450     if (action.size() > 3 + comm_size)
451       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
452
453     for (unsigned int i = 0; i < comm_size; i++) {
454       recvcounts->push_back(std::stoi(action[i + 2]));
455     }
456     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
457   }
458 };
459
460 class AllToAllVArgParser : public CollCommParser {
461 public:
462   int recv_size_sum;
463   int send_size_sum;
464   std::shared_ptr<std::vector<int>> recvcounts;
465   std::shared_ptr<std::vector<int>> sendcounts;
466   std::vector<int> senddisps;
467   std::vector<int> recvdisps;
468   int send_buf_size;
469   int recv_buf_size;
470   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
471   {
472     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
473           0 allToAllV 100 1 7 10 12 100 1 70 10 5
474        where:
475         1) 100 is the size of the send buffer *sizeof(int),
476         2) 1 7 10 12 is the sendcounts array
477         3) 100*sizeof(int) is the size of the receiver buffer
478         4)  1 70 10 5 is the recvcounts array
479     */
480     comm_size = MPI_COMM_WORLD->size();
481     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
482     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
483     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
484     senddisps  = std::vector<int>(comm_size, 0);
485     recvdisps  = std::vector<int>(comm_size, 0);
486
487     if (action.size() > 5 + 2 * comm_size)
488       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
489     if (action.size() > 5 + 2 * comm_size)
490       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
491
492     send_buf_size=parse_double(action[2]);
493     recv_buf_size=parse_double(action[3+comm_size]);
494     for (unsigned int i = 0; i < comm_size; i++) {
495       (*sendcounts)[i] = std::stoi(action[3 + i]);
496       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
497     }
498     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
499     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
500   }
501 };
502
503 template <class T> class ReplayAction {
504 protected:
505   const std::string name;
506   RequestStorage* req_storage; // Points to the right storage for this process, nullptr except for Send/Recv/Wait/Test actions.
507   const int my_proc_id;
508   T args;
509
510 public:
511   explicit ReplayAction(std::string name, RequestStorage& storage) : name(name), req_storage(&storage), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
512   explicit ReplayAction(std::string name) : name(name), req_storage(nullptr), my_proc_id(simgrid::s4u::this_actor::get_pid()) {}
513   virtual ~ReplayAction() = default;
514
515   virtual void execute(simgrid::xbt::ReplayAction& action)
516   {
517     // Needs to be re-initialized for every action, hence here
518     double start_time = smpi_process()->simulated_elapsed();
519     args.parse(action, name);
520     kernel(action);
521     if (name != "Init")
522       log_timed_action(action, start_time);
523   }
524
525   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
526
527   void* send_buffer(int size)
528   {
529     return smpi_get_tmp_sendbuffer(size);
530   }
531
532   void* recv_buffer(int size)
533   {
534     return smpi_get_tmp_recvbuffer(size);
535   }
536 };
537
538 class WaitAction : public ReplayAction<WaitTestParser> {
539 public:
540   explicit WaitAction(RequestStorage& storage) : ReplayAction("Wait", storage) {}
541   void kernel(simgrid::xbt::ReplayAction& action) override
542   {
543     std::string s = boost::algorithm::join(action, " ");
544     xbt_assert(req_storage->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
545     MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
546     req_storage->remove(request);
547
548     if (request == MPI_REQUEST_NULL) {
549       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
550        * return.*/
551       return;
552     }
553
554     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
555
556     // Must be taken before Request::wait() since the request may be set to
557     // MPI_REQUEST_NULL by Request::wait!
558     bool is_wait_for_receive = (request->flags() & RECV);
559     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
560     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
561
562     MPI_Status status;
563     Request::wait(&request, &status);
564
565     TRACE_smpi_comm_out(rank);
566     if (is_wait_for_receive)
567       TRACE_smpi_recv(args.src, args.dst, args.tag);
568   }
569 };
570
571 class SendAction : public ReplayAction<SendRecvParser> {
572 public:
573   SendAction() = delete;
574   explicit SendAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
575   void kernel(simgrid::xbt::ReplayAction& action) override
576   {
577     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
578
579     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
580                                                                              args.tag, Datatype::encode(args.datatype1)));
581     if (not TRACE_smpi_view_internals())
582       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
583
584     if (name == "send") {
585       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
586     } else if (name == "Isend") {
587       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
588       req_storage->add(request);
589     } else {
590       xbt_die("Don't know this action, %s", name.c_str());
591     }
592
593     TRACE_smpi_comm_out(my_proc_id);
594   }
595 };
596
597 class RecvAction : public ReplayAction<SendRecvParser> {
598 public:
599   RecvAction() = delete;
600   explicit RecvAction(std::string name, RequestStorage& storage) : ReplayAction(name, storage) {}
601   void kernel(simgrid::xbt::ReplayAction& action) override
602   {
603     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
604
605     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
606                                                                              args.tag, Datatype::encode(args.datatype1)));
607
608     MPI_Status status;
609     // unknown size from the receiver point of view
610     if (args.size <= 0.0) {
611       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
612       args.size = status.count;
613     }
614
615     if (name == "recv") {
616       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
617     } else if (name == "Irecv") {
618       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
619       req_storage->add(request);
620     }
621
622     TRACE_smpi_comm_out(my_proc_id);
623     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
624     if (name == "recv" && not TRACE_smpi_view_internals()) {
625       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
626     }
627   }
628 };
629
630 class ComputeAction : public ReplayAction<ComputeParser> {
631 public:
632   ComputeAction() : ReplayAction("compute") {}
633   void kernel(simgrid::xbt::ReplayAction& action) override
634   {
635     TRACE_smpi_computing_in(my_proc_id, args.flops);
636     smpi_execute_flops(args.flops);
637     TRACE_smpi_computing_out(my_proc_id);
638   }
639 };
640
641 class TestAction : public ReplayAction<WaitTestParser> {
642 public:
643   explicit TestAction(RequestStorage& storage) : ReplayAction("Test", storage) {}
644   void kernel(simgrid::xbt::ReplayAction& action) override
645   {
646     MPI_Request request = req_storage->find(args.src, args.dst, args.tag);
647     req_storage->remove(request);
648     // if request is null here, this may mean that a previous test has succeeded
649     // Different times in traced application and replayed version may lead to this
650     // In this case, ignore the extra calls.
651     if (request != MPI_REQUEST_NULL) {
652       TRACE_smpi_testing_in(my_proc_id);
653
654       MPI_Status status;
655       int flag = Request::test(&request, &status);
656
657       XBT_DEBUG("MPI_Test result: %d", flag);
658       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
659        * nullptr.*/
660       if (request == MPI_REQUEST_NULL)
661         req_storage->addNullRequest(args.src, args.dst, args.tag);
662       else
663         req_storage->add(request);
664
665       TRACE_smpi_testing_out(my_proc_id);
666     }
667   }
668 };
669
670 class InitAction : public ReplayAction<ActionArgParser> {
671 public:
672   InitAction() : ReplayAction("Init") {}
673   void kernel(simgrid::xbt::ReplayAction& action) override
674   {
675     CHECK_ACTION_PARAMS(action, 0, 1)
676     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
677                                            : MPI_BYTE;  // default TAU datatype
678
679     /* start a simulated timer */
680     smpi_process()->simulated_start();
681   }
682 };
683
684 class CommunicatorAction : public ReplayAction<ActionArgParser> {
685 public:
686   CommunicatorAction() : ReplayAction("Comm") {}
687   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
688 };
689
690 class WaitAllAction : public ReplayAction<ActionArgParser> {
691 public:
692   explicit WaitAllAction(RequestStorage& storage) : ReplayAction("waitAll", storage) {}
693   void kernel(simgrid::xbt::ReplayAction& action) override
694   {
695     const unsigned int count_requests = req_storage->size();
696
697     if (count_requests > 0) {
698       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
699       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
700       std::vector<MPI_Request> reqs;
701       req_storage->get_requests(reqs);
702       for (const auto& req : reqs) {
703         if (req && (req->flags() & RECV)) {
704           sender_receiver.push_back({req->src(), req->dst()});
705         }
706       }
707       MPI_Status status[count_requests];
708       Request::waitall(count_requests, &(reqs.data())[0], status);
709       req_storage->get_store().clear();
710
711       for (auto& pair : sender_receiver) {
712         TRACE_smpi_recv(pair.first, pair.second, 0);
713       }
714       TRACE_smpi_comm_out(my_proc_id);
715     }
716   }
717 };
718
719 class BarrierAction : public ReplayAction<ActionArgParser> {
720 public:
721   BarrierAction() : ReplayAction("barrier") {}
722   void kernel(simgrid::xbt::ReplayAction& action) override
723   {
724     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
725     Colls::barrier(MPI_COMM_WORLD);
726     TRACE_smpi_comm_out(my_proc_id);
727   }
728 };
729
730 class BcastAction : public ReplayAction<BcastArgParser> {
731 public:
732   BcastAction() : ReplayAction("bcast") {}
733   void kernel(simgrid::xbt::ReplayAction& action) override
734   {
735     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
736                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
737                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
738
739     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
740
741     TRACE_smpi_comm_out(my_proc_id);
742   }
743 };
744
745 class ReduceAction : public ReplayAction<ReduceArgParser> {
746 public:
747   ReduceAction() : ReplayAction("reduce") {}
748   void kernel(simgrid::xbt::ReplayAction& action) override
749   {
750     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
751                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
752                                                       args.comp_size, args.comm_size, -1,
753                                                       Datatype::encode(args.datatype1), ""));
754
755     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
756         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
757     smpi_execute_flops(args.comp_size);
758
759     TRACE_smpi_comm_out(my_proc_id);
760   }
761 };
762
763 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
764 public:
765   AllReduceAction() : ReplayAction("allReduce") {}
766   void kernel(simgrid::xbt::ReplayAction& action) override
767   {
768     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
769                                                                                 Datatype::encode(args.datatype1), ""));
770
771     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
772         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
773     smpi_execute_flops(args.comp_size);
774
775     TRACE_smpi_comm_out(my_proc_id);
776   }
777 };
778
779 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
780 public:
781   AllToAllAction() : ReplayAction("allToAll") {}
782   void kernel(simgrid::xbt::ReplayAction& action) override
783   {
784     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
785                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
786                                                     Datatype::encode(args.datatype1),
787                                                     Datatype::encode(args.datatype2)));
788
789     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
790                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
791                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
792
793     TRACE_smpi_comm_out(my_proc_id);
794   }
795 };
796
797 class GatherAction : public ReplayAction<GatherArgParser> {
798 public:
799   explicit GatherAction(std::string name) : ReplayAction(name) {}
800   void kernel(simgrid::xbt::ReplayAction& action) override
801   {
802     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
803                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
804
805     if (name == "gather") {
806       int rank = MPI_COMM_WORLD->rank();
807       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
808                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
809     }
810     else
811       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
812                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
813
814     TRACE_smpi_comm_out(my_proc_id);
815   }
816 };
817
818 class GatherVAction : public ReplayAction<GatherVArgParser> {
819 public:
820   explicit GatherVAction(std::string name) : ReplayAction(name) {}
821   void kernel(simgrid::xbt::ReplayAction& action) override
822   {
823     int rank = MPI_COMM_WORLD->rank();
824
825     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
826                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
827                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
828
829     if (name == "gatherV") {
830       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
831                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
832                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
833     }
834     else {
835       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
836                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
837                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
838     }
839
840     TRACE_smpi_comm_out(my_proc_id);
841   }
842 };
843
844 class ScatterAction : public ReplayAction<ScatterArgParser> {
845 public:
846   ScatterAction() : ReplayAction("scatter") {}
847   void kernel(simgrid::xbt::ReplayAction& action) override
848   {
849     int rank = MPI_COMM_WORLD->rank();
850     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
851                                                                           Datatype::encode(args.datatype1),
852                                                                           Datatype::encode(args.datatype2)));
853
854     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
855                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
856
857     TRACE_smpi_comm_out(my_proc_id);
858   }
859 };
860
861
862 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
863 public:
864   ScatterVAction() : ReplayAction("scatterV") {}
865   void kernel(simgrid::xbt::ReplayAction& action) override
866   {
867     int rank = MPI_COMM_WORLD->rank();
868     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
869           nullptr, Datatype::encode(args.datatype1),
870           Datatype::encode(args.datatype2)));
871
872     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
873                     args.sendcounts->data(), args.disps.data(), args.datatype1,
874                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
875                     MPI_COMM_WORLD);
876
877     TRACE_smpi_comm_out(my_proc_id);
878   }
879 };
880
881 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
882 public:
883   ReduceScatterAction() : ReplayAction("reduceScatter") {}
884   void kernel(simgrid::xbt::ReplayAction& action) override
885   {
886     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
887                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
888                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
889                                                          Datatype::encode(args.datatype1)));
890
891     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
892                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
893                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
894
895     smpi_execute_flops(args.comp_size);
896     TRACE_smpi_comm_out(my_proc_id);
897   }
898 };
899
900 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
901 public:
902   AllToAllVAction() : ReplayAction("allToAllV") {}
903   void kernel(simgrid::xbt::ReplayAction& action) override
904   {
905     TRACE_smpi_comm_in(my_proc_id, __func__,
906                        new simgrid::instr::VarCollTIData(
907                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
908                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
909
910     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
911                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
912
913     TRACE_smpi_comm_out(my_proc_id);
914   }
915 };
916 } // Replay Namespace
917 }} // namespace simgrid::smpi
918
919 std::vector<simgrid::smpi::replay::RequestStorage> storage;
920 /** @brief Only initialize the replay, don't do it for real */
921 void smpi_replay_init(int* argc, char*** argv)
922 {
923   simgrid::smpi::Process::init(argc, argv);
924   smpi_process()->mark_as_initialized();
925   smpi_process()->set_replaying(true);
926
927   int my_proc_id = simgrid::s4u::this_actor::get_pid();
928   storage.resize(smpi_process_count());
929
930   TRACE_smpi_init(my_proc_id);
931   TRACE_smpi_computing_init(my_proc_id);
932   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
933   TRACE_smpi_comm_out(my_proc_id);
934   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
935   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
936   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
937   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
938   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
939
940   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
941   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
942   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
943   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
944   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
945   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
946   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
947   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
948   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
949   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
950   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
951   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
952   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
953   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
954   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
955   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
956   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
957   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
958   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
959   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
960   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
961
962   //if we have a delayed start, sleep here.
963   if(*argc>2){
964     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
965     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
966     smpi_execute_flops(value);
967   } else {
968     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
969     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
970     smpi_execute_flops(0.0);
971   }
972 }
973
974 /** @brief actually run the replay after initialization */
975 void smpi_replay_main(int* argc, char*** argv)
976 {
977   static int active_processes = 0;
978   active_processes++;
979   simgrid::xbt::replay_runner(*argc, *argv);
980
981   /* and now, finalize everything */
982   /* One active process will stop. Decrease the counter*/
983   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
984   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
985   if (count_requests > 0) {
986     MPI_Request requests[count_requests];
987     MPI_Status status[count_requests];
988     unsigned int i=0;
989
990     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
991       requests[i] = pair.second;
992       i++;
993     }
994     simgrid::smpi::Request::waitall(count_requests, requests, status);
995   }
996   active_processes--;
997
998   if(active_processes==0){
999     /* Last process alive speaking: end the simulated timer */
1000     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1001     smpi_free_replay_tmp_buffers();
1002   }
1003
1004   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
1005                      new simgrid::instr::NoOpTIData("finalize"));
1006
1007   smpi_process()->finalize();
1008
1009   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
1010   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
1011 }
1012
1013 /** @brief chain a replay initialization and a replay start */
1014 void smpi_replay_run(int* argc, char*** argv)
1015 {
1016   smpi_replay_init(argc, argv);
1017   smpi_replay_main(argc, argv);
1018 }