Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Make the 'storage' variable static
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67
68 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 {
70   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
71     std::string s = boost::algorithm::join(action, " ");
72     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
73   }
74 }
75
76 /* Helper function */
77 static double parse_double(std::string string)
78 {
79   return xbt_str_parse_double(string.c_str(), "%s is not a double");
80 }
81
82 namespace simgrid {
83 namespace smpi {
84
85 namespace replay {
86 MPI_Datatype MPI_DEFAULT_TYPE;
87
88 class RequestStorage {
89 private:
90     req_storage_t store;
91
92 public:
93     RequestStorage() {}
94     int size()
95     {
96       return store.size();
97     }
98
99     req_storage_t& get_store()
100     {
101       return store;
102     }
103
104     void get_requests(std::vector<MPI_Request>& vec)
105     {
106       for (auto& pair : store) {
107         auto& req = pair.second;
108         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
109         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
110           vec.push_back(pair.second);
111           pair.second->print_request("MM");
112         }
113       }
114     }
115
116     MPI_Request find(int src, int dst, int tag)
117     {
118       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
119       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
120     }
121
122     void remove(MPI_Request req)
123     {
124       if (req == MPI_REQUEST_NULL) return;
125
126       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
127     }
128
129     void add(MPI_Request req)
130     {
131       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
132         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
133     }
134
135     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
136     void addNullRequest(int src, int dst, int tag)
137     {
138       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
139     }
140 };
141
142 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
143 {
144   CHECK_ACTION_PARAMS(action, 3, 0)
145   src = std::stoi(action[2]);
146   dst = std::stoi(action[3]);
147   tag = std::stoi(action[4]);
148 }
149
150 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
151 {
152   CHECK_ACTION_PARAMS(action, 3, 1)
153   partner = std::stoi(action[2]);
154   tag     = std::stoi(action[3]);
155   size    = parse_double(action[4]);
156   if (action.size() > 5)
157     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
158 }
159
160 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
161 {
162   CHECK_ACTION_PARAMS(action, 1, 0)
163   flops = parse_double(action[2]);
164 }
165
166 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
167 {
168   CHECK_ACTION_PARAMS(action, 1, 2)
169   size = parse_double(action[2]);
170   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
171   if (action.size() > 4)
172     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
173 }
174
175 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
176 {
177   CHECK_ACTION_PARAMS(action, 2, 2)
178   comm_size = parse_double(action[2]);
179   comp_size = parse_double(action[3]);
180   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
181   if (action.size() > 5)
182     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
183 }
184
185 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
186 {
187   CHECK_ACTION_PARAMS(action, 2, 1)
188   comm_size = parse_double(action[2]);
189   comp_size = parse_double(action[3]);
190   if (action.size() > 4)
191     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
192 }
193
194 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
195 {
196   CHECK_ACTION_PARAMS(action, 2, 1)
197   comm_size = MPI_COMM_WORLD->size();
198   send_size = parse_double(action[2]);
199   recv_size = parse_double(action[3]);
200
201   if (action.size() > 4)
202     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
203   if (action.size() > 5)
204     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
205 }
206
207 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
208 {
209   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
210         0 gather 68 68 0 0 0
211       where:
212         1) 68 is the sendcounts
213         2) 68 is the recvcounts
214         3) 0 is the root node
215         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
216         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
217   */
218   CHECK_ACTION_PARAMS(action, 2, 3)
219   comm_size = MPI_COMM_WORLD->size();
220   send_size = parse_double(action[2]);
221   recv_size = parse_double(action[3]);
222
223   if (name == "gather") {
224     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
225     if (action.size() > 5)
226       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
227     if (action.size() > 6)
228       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
229   } else {
230     if (action.size() > 4)
231       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
232     if (action.size() > 5)
233       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
234   }
235 }
236
237 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
238 {
239   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
240        0 gather 68 68 10 10 10 0 0 0
241      where:
242        1) 68 is the sendcount
243        2) 68 10 10 10 is the recvcounts
244        3) 0 is the root node
245        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
246        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
247   */
248   comm_size = MPI_COMM_WORLD->size();
249   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
250   send_size  = parse_double(action[2]);
251   disps      = std::vector<int>(comm_size, 0);
252   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
253
254   if (name == "gatherV") {
255     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
256     if (action.size() > 4 + comm_size)
257       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
258     if (action.size() > 5 + comm_size)
259       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
260   } else {
261     int datatype_index = 0;
262     int disp_index     = 0;
263     /* The 3 comes from "0 gather <sendcount>", which must always be present.
264      * The + comm_size is the recvcounts array, which must also be present
265      */
266     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
267       datatype_index = 3 + comm_size;
268       disp_index     = datatype_index + 1;
269       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
270       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
271     } else if (action.size() >
272                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
273       disp_index = 3 + comm_size;
274     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
275       datatype_index = 3 + comm_size;
276       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
277       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
278     }
279
280     if (disp_index != 0) {
281       for (unsigned int i = 0; i < comm_size; i++)
282         disps[i]          = std::stoi(action[disp_index + i]);
283     }
284   }
285
286   for (unsigned int i = 0; i < comm_size; i++) {
287     (*recvcounts)[i] = std::stoi(action[i + 3]);
288   }
289   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
290 }
291
292 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
293 {
294   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
295         0 gather 68 68 0 0 0
296       where:
297         1) 68 is the sendcounts
298         2) 68 is the recvcounts
299         3) 0 is the root node
300         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
301         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
302   */
303   CHECK_ACTION_PARAMS(action, 2, 3)
304   comm_size = MPI_COMM_WORLD->size();
305   send_size = parse_double(action[2]);
306   recv_size = parse_double(action[3]);
307   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
308   if (action.size() > 5)
309     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
310   if (action.size() > 6)
311     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
312 }
313
314 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
315 {
316   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
317      0 gather 68 10 10 10 68 0 0 0
318       where:
319       1) 68 10 10 10 is the sendcounts
320       2) 68 is the recvcount
321       3) 0 is the root node
322       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
323       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
324   */
325   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
326   recv_size  = parse_double(action[2 + comm_size]);
327   disps      = std::vector<int>(comm_size, 0);
328   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
329
330   if (action.size() > 5 + comm_size)
331     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
332   if (action.size() > 5 + comm_size)
333     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
334
335   for (unsigned int i = 0; i < comm_size; i++) {
336     (*sendcounts)[i] = std::stoi(action[i + 2]);
337   }
338   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
339   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
340 }
341
342 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
343 {
344   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
345        0 reduceScatter 275427 275427 275427 204020 11346849 0
346      where:
347        1) The first four values after the name of the action declare the recvcounts array
348        2) The value 11346849 is the amount of instructions
349        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
350   */
351   comm_size = MPI_COMM_WORLD->size();
352   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
353   comp_size  = parse_double(action[2 + comm_size]);
354   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
355   if (action.size() > 3 + comm_size)
356     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
357
358   for (unsigned int i = 0; i < comm_size; i++) {
359     recvcounts->push_back(std::stoi(action[i + 2]));
360   }
361   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
362 }
363
364 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
365 {
366   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
367         0 allToAllV 100 1 7 10 12 100 1 70 10 5
368      where:
369       1) 100 is the size of the send buffer *sizeof(int),
370       2) 1 7 10 12 is the sendcounts array
371       3) 100*sizeof(int) is the size of the receiver buffer
372       4)  1 70 10 5 is the recvcounts array
373   */
374   comm_size = MPI_COMM_WORLD->size();
375   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
376   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
377   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378   senddisps  = std::vector<int>(comm_size, 0);
379   recvdisps  = std::vector<int>(comm_size, 0);
380
381   if (action.size() > 5 + 2 * comm_size)
382     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
383   if (action.size() > 5 + 2 * comm_size)
384     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
385
386   send_buf_size = parse_double(action[2]);
387   recv_buf_size = parse_double(action[3 + comm_size]);
388   for (unsigned int i = 0; i < comm_size; i++) {
389     (*sendcounts)[i] = std::stoi(action[3 + i]);
390     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
391   }
392   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
393   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
394 }
395
396 template<class T>
397 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
398 {
399   // Needs to be re-initialized for every action, hence here
400   double start_time = smpi_process()->simulated_elapsed();
401   args.parse(action, name);
402   kernel(action);
403   if (name != "Init")
404     log_timed_action(action, start_time);
405 }
406
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
408 {
409   std::string s = boost::algorithm::join(action, " ");
410   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
412   req_storage.remove(request);
413
414   if (request == MPI_REQUEST_NULL) {
415     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
416      * return.*/
417     return;
418   }
419
420   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
421
422   // Must be taken before Request::wait() since the request may be set to
423   // MPI_REQUEST_NULL by Request::wait!
424   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
425   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
426   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
427
428   MPI_Status status;
429   Request::wait(&request, &status);
430
431   TRACE_smpi_comm_out(rank);
432   if (is_wait_for_receive)
433     TRACE_smpi_recv(args.src, args.dst, args.tag);
434   }
435
436   void SendAction::kernel(simgrid::xbt::ReplayAction& action)
437   {
438     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
439
440     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
441                                                                              args.tag, Datatype::encode(args.datatype1)));
442     if (not TRACE_smpi_view_internals())
443       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
444
445     if (name == "send") {
446       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447     } else if (name == "Isend") {
448       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
449       req_storage.add(request);
450     } else {
451       xbt_die("Don't know this action, %s", name.c_str());
452     }
453
454     TRACE_smpi_comm_out(my_proc_id);
455   }
456
457   void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
458   {
459     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
460
461     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
462                                                                              args.tag, Datatype::encode(args.datatype1)));
463
464     MPI_Status status;
465     // unknown size from the receiver point of view
466     if (args.size <= 0.0) {
467       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
468       args.size = status.count;
469     }
470
471     if (name == "recv") {
472       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
473     } else if (name == "Irecv") {
474       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
475       req_storage.add(request);
476     }
477
478     TRACE_smpi_comm_out(my_proc_id);
479     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
480     if (name == "recv" && not TRACE_smpi_view_internals()) {
481       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
482     }
483   }
484
485   void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
486   {
487     TRACE_smpi_computing_in(my_proc_id, args.flops);
488     smpi_execute_flops(args.flops);
489     TRACE_smpi_computing_out(my_proc_id);
490   }
491
492   void TestAction::kernel(simgrid::xbt::ReplayAction& action)
493   {
494     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
495     req_storage.remove(request);
496     // if request is null here, this may mean that a previous test has succeeded
497     // Different times in traced application and replayed version may lead to this
498     // In this case, ignore the extra calls.
499     if (request != MPI_REQUEST_NULL) {
500       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
501
502       MPI_Status status;
503       int flag = Request::test(&request, &status);
504
505       XBT_DEBUG("MPI_Test result: %d", flag);
506       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
507        * nullptr.*/
508       if (request == MPI_REQUEST_NULL)
509         req_storage.addNullRequest(args.src, args.dst, args.tag);
510       else
511         req_storage.add(request);
512
513       TRACE_smpi_comm_out(my_proc_id);
514     }
515   }
516
517   void InitAction::kernel(simgrid::xbt::ReplayAction& action)
518   {
519     CHECK_ACTION_PARAMS(action, 0, 1)
520     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
521                                            : MPI_BYTE;  // default TAU datatype
522
523     /* start a simulated timer */
524     smpi_process()->simulated_start();
525   }
526
527   void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
528   {
529     /* nothing to do */
530   }
531
532   void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
533   {
534     const unsigned int count_requests = req_storage.size();
535
536     if (count_requests > 0) {
537       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
538       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
539       std::vector<MPI_Request> reqs;
540       req_storage.get_requests(reqs);
541       for (const auto& req : reqs) {
542         if (req && (req->flags() & MPI_REQ_RECV)) {
543           sender_receiver.push_back({req->src(), req->dst()});
544         }
545       }
546       MPI_Status status[count_requests];
547       Request::waitall(count_requests, &(reqs.data())[0], status);
548       req_storage.get_store().clear();
549
550       for (auto& pair : sender_receiver) {
551         TRACE_smpi_recv(pair.first, pair.second, 0);
552       }
553       TRACE_smpi_comm_out(my_proc_id);
554     }
555   }
556
557   void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
558   {
559     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
560     Colls::barrier(MPI_COMM_WORLD);
561     TRACE_smpi_comm_out(my_proc_id);
562   }
563
564   void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
565   {
566     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
567                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
568                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
569
570     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
571
572     TRACE_smpi_comm_out(my_proc_id);
573   }
574
575   void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
576   {
577     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
578                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
579                                                       args.comp_size, args.comm_size, -1,
580                                                       Datatype::encode(args.datatype1), ""));
581
582     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
583         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
584     smpi_execute_flops(args.comp_size);
585
586     TRACE_smpi_comm_out(my_proc_id);
587   }
588
589   void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
590   {
591     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
592                                                                                 Datatype::encode(args.datatype1), ""));
593
594     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
595         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
596     smpi_execute_flops(args.comp_size);
597
598     TRACE_smpi_comm_out(my_proc_id);
599   }
600
601   void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
602   {
603     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
604                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
605                                                     Datatype::encode(args.datatype1),
606                                                     Datatype::encode(args.datatype2)));
607
608     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
609                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
610                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
611
612     TRACE_smpi_comm_out(my_proc_id);
613   }
614
615   void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
616   {
617     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
618                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
619
620     if (name == "gather") {
621       int rank = MPI_COMM_WORLD->rank();
622       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
623                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
624     }
625     else
626       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
627                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
628
629     TRACE_smpi_comm_out(my_proc_id);
630   }
631
632   void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
633   {
634     int rank = MPI_COMM_WORLD->rank();
635
636     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
637                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
638                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
639
640     if (name == "gatherV") {
641       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
642                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
643                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
644     }
645     else {
646       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
647                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
648                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
649     }
650
651     TRACE_smpi_comm_out(my_proc_id);
652   }
653
654   void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
655   {
656     int rank = MPI_COMM_WORLD->rank();
657     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
658                                                                           Datatype::encode(args.datatype1),
659                                                                           Datatype::encode(args.datatype2)));
660
661     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
663
664     TRACE_smpi_comm_out(my_proc_id);
665   }
666
667   void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
668   {
669     int rank = MPI_COMM_WORLD->rank();
670     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
671           nullptr, Datatype::encode(args.datatype1),
672           Datatype::encode(args.datatype2)));
673
674     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
675                     args.sendcounts->data(), args.disps.data(), args.datatype1,
676                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
677                     MPI_COMM_WORLD);
678
679     TRACE_smpi_comm_out(my_proc_id);
680   }
681
682   void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
683   {
684     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
685                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
686                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
687                                                          Datatype::encode(args.datatype1)));
688
689     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
690                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
691                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
692
693     smpi_execute_flops(args.comp_size);
694     TRACE_smpi_comm_out(my_proc_id);
695   }
696
697   void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
698   {
699     TRACE_smpi_comm_in(my_proc_id, __func__,
700                        new simgrid::instr::VarCollTIData(
701                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
702                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
703
704     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
705                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
706
707     TRACE_smpi_comm_out(my_proc_id);
708   }
709 } // Replay Namespace
710 }} // namespace simgrid::smpi
711
712 static std::vector<simgrid::smpi::replay::RequestStorage> storage;
713 /** @brief Only initialize the replay, don't do it for real */
714 void smpi_replay_init(int* argc, char*** argv)
715 {
716   simgrid::smpi::Process::init(argc, argv);
717   smpi_process()->mark_as_initialized();
718   smpi_process()->set_replaying(true);
719
720   int my_proc_id = simgrid::s4u::this_actor::get_pid();
721   storage.resize(smpi_process_count());
722
723   TRACE_smpi_init(my_proc_id);
724   TRACE_smpi_computing_init(my_proc_id);
725   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
726   TRACE_smpi_comm_out(my_proc_id);
727   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
728   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
729   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
730   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
732   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
733   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
739   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
740   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
741   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
742   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
743   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
744   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
745   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
746   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
747   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
748   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
749   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
750   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
751   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
752   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
753
754   //if we have a delayed start, sleep here.
755   if(*argc>2){
756     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
757     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
758     smpi_execute_flops(value);
759   } else {
760     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
761     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
762     smpi_execute_flops(0.0);
763   }
764 }
765
766 /** @brief actually run the replay after initialization */
767 void smpi_replay_main(int* argc, char*** argv)
768 {
769   static int active_processes = 0;
770   active_processes++;
771   simgrid::xbt::replay_runner(*argc, *argv);
772
773   /* and now, finalize everything */
774   /* One active process will stop. Decrease the counter*/
775   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
776   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
777   if (count_requests > 0) {
778     MPI_Request requests[count_requests];
779     MPI_Status status[count_requests];
780     unsigned int i=0;
781
782     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
783       requests[i] = pair.second;
784       i++;
785     }
786     simgrid::smpi::Request::waitall(count_requests, requests, status);
787   }
788   active_processes--;
789
790   if(active_processes==0){
791     /* Last process alive speaking: end the simulated timer */
792     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
793     smpi_free_replay_tmp_buffers();
794   }
795
796   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
797                      new simgrid::instr::NoOpTIData("finalize"));
798
799   smpi_process()->finalize();
800
801   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
802   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
803 }
804
805 /** @brief chain a replay initialization and a replay start */
806 void smpi_replay_run(int* argc, char*** argv)
807 {
808   smpi_replay_init(argc, argv);
809   smpi_replay_main(argc, argv);
810 }