Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
93114a32d3c8afe90f3f3f5af0368029d15818cb
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 0)
171   time = parse_double(action[2]);
172 }
173
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 0)
177   filename = std::string(action[2]);
178   line = std::stoi(action[3]);
179 }
180
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 2)
184   size = parse_double(action[2]);
185   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186   if (action.size() > 4)
187     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
188 }
189
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 2)
193   comm_size = parse_double(action[2]);
194   comp_size = parse_double(action[3]);
195   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196   if (action.size() > 5)
197     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 }
199
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   comm_size = parse_double(action[2]);
204   comp_size = parse_double(action[3]);
205   if (action.size() > 4)
206     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
207 }
208
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = MPI_COMM_WORLD->size();
213   send_size = parse_double(action[2]);
214   recv_size = parse_double(action[3]);
215
216   if (action.size() > 4)
217     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218   if (action.size() > 5)
219     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
240     if (action.size() > 5)
241       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242     if (action.size() > 6)
243       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
244   } else {
245     if (action.size() > 4)
246       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247     if (action.size() > 5)
248       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249   }
250 }
251
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
253 {
254   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255        0 gather 68 68 10 10 10 0 0 0
256      where:
257        1) 68 is the sendcount
258        2) 68 10 10 10 is the recvcounts
259        3) 0 is the root node
260        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
262   */
263   comm_size = MPI_COMM_WORLD->size();
264   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265   send_size  = parse_double(action[2]);
266   disps      = std::vector<int>(comm_size, 0);
267   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
268
269   if (name == "gatherv") {
270     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271     if (action.size() > 4 + comm_size)
272       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273     if (action.size() > 5 + comm_size)
274       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
275   } else {
276     int datatype_index = 0;
277     int disp_index     = 0;
278     /* The 3 comes from "0 gather <sendcount>", which must always be present.
279      * The + comm_size is the recvcounts array, which must also be present
280      */
281     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
282       datatype_index = 3 + comm_size;
283       disp_index     = datatype_index + 1;
284       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
286     } else if (action.size() >
287                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
288       disp_index = 3 + comm_size;
289     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
290       datatype_index = 3 + comm_size;
291       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
292       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
293     }
294
295     if (disp_index != 0) {
296       for (unsigned int i = 0; i < comm_size; i++)
297         disps[i]          = std::stoi(action[disp_index + i]);
298     }
299   }
300
301   for (unsigned int i = 0; i < comm_size; i++) {
302     (*recvcounts)[i] = std::stoi(action[i + 3]);
303   }
304   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
305 }
306
307 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
308 {
309   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
310         0 gather 68 68 0 0 0
311       where:
312         1) 68 is the sendcounts
313         2) 68 is the recvcounts
314         3) 0 is the root node
315         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
316         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
317   */
318   CHECK_ACTION_PARAMS(action, 2, 3)
319   comm_size = MPI_COMM_WORLD->size();
320   send_size = parse_double(action[2]);
321   recv_size = parse_double(action[3]);
322   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
323   if (action.size() > 5)
324     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
325   if (action.size() > 6)
326     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
327 }
328
329 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
330 {
331   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
332      0 gather 68 10 10 10 68 0 0 0
333       where:
334       1) 68 10 10 10 is the sendcounts
335       2) 68 is the recvcount
336       3) 0 is the root node
337       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
338       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
339   */
340   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
341   recv_size  = parse_double(action[2 + comm_size]);
342   disps      = std::vector<int>(comm_size, 0);
343   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
344
345   if (action.size() > 5 + comm_size)
346     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
347   if (action.size() > 5 + comm_size)
348     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
349
350   for (unsigned int i = 0; i < comm_size; i++) {
351     (*sendcounts)[i] = std::stoi(action[i + 2]);
352   }
353   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
354   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
355 }
356
357 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 {
359   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
360        0 reducescatter 275427 275427 275427 204020 11346849 0
361      where:
362        1) The first four values after the name of the action declare the recvcounts array
363        2) The value 11346849 is the amount of instructions
364        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365   */
366   comm_size = MPI_COMM_WORLD->size();
367   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
368   comp_size  = parse_double(action[2 + comm_size]);
369   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
370   if (action.size() > 3 + comm_size)
371     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
372
373   for (unsigned int i = 0; i < comm_size; i++) {
374     recvcounts->push_back(std::stoi(action[i + 2]));
375   }
376   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 }
378
379 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 {
381   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
382         0 alltoallv 100 1 7 10 12 100 1 70 10 5
383      where:
384       1) 100 is the size of the send buffer *sizeof(int),
385       2) 1 7 10 12 is the sendcounts array
386       3) 100*sizeof(int) is the size of the receiver buffer
387       4)  1 70 10 5 is the recvcounts array
388   */
389   comm_size = MPI_COMM_WORLD->size();
390   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
391   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
393   senddisps  = std::vector<int>(comm_size, 0);
394   recvdisps  = std::vector<int>(comm_size, 0);
395
396   if (action.size() > 5 + 2 * comm_size)
397     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
398   if (action.size() > 5 + 2 * comm_size)
399     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
400
401   send_buf_size = parse_double(action[2]);
402   recv_buf_size = parse_double(action[3 + comm_size]);
403   for (unsigned int i = 0; i < comm_size; i++) {
404     (*sendcounts)[i] = std::stoi(action[3 + i]);
405     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
406   }
407   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
408   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
409 }
410
411 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
412 {
413   std::string s = boost::algorithm::join(action, " ");
414   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
415   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
416   req_storage.remove(request);
417
418   if (request == MPI_REQUEST_NULL) {
419     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
420      * return.*/
421     return;
422   }
423
424   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
425
426   // Must be taken before Request::wait() since the request may be set to
427   // MPI_REQUEST_NULL by Request::wait!
428   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
429   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
430   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
431
432   MPI_Status status;
433   Request::wait(&request, &status);
434
435   TRACE_smpi_comm_out(rank);
436   if (is_wait_for_receive)
437     TRACE_smpi_recv(args.src, args.dst, args.tag);
438 }
439
440 void SendAction::kernel(simgrid::xbt::ReplayAction&)
441 {
442   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
443
444   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
445         args.tag, Datatype::encode(args.datatype1)));
446   if (not TRACE_smpi_view_internals())
447     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
448
449   if (name == "send") {
450     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
451   } else if (name == "isend") {
452     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
453     req_storage.add(request);
454   } else {
455     xbt_die("Don't know this action, %s", name.c_str());
456   }
457
458   TRACE_smpi_comm_out(my_proc_id);
459 }
460
461 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 {
463   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
464         args.tag, Datatype::encode(args.datatype1)));
465
466   MPI_Status status;
467   // unknown size from the receiver point of view
468   if (args.size <= 0.0) {
469     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
470     args.size = status.count;
471   }
472
473   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
474   if (name == "recv") {
475     is_recv = true;
476     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
477   } else if (name == "irecv") {
478     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
479     req_storage.add(request);
480   } else {
481     THROW_IMPOSSIBLE;
482   }
483
484   TRACE_smpi_comm_out(my_proc_id);
485   if (is_recv && not TRACE_smpi_view_internals()) {
486     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
487     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
488   }
489 }
490
491 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
492 {
493   if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
494     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
495   }
496 }
497
498 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
499 {
500   XBT_DEBUG("Sleep for: %lf secs", args.time);
501   int rank = simgrid::s4u::this_actor::get_pid();
502   TRACE_smpi_sleeping_in(rank, args.time);
503   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
504   TRACE_smpi_sleeping_out(rank);
505 }
506
507 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
508 {
509   smpi_trace_set_call_location(args.filename.c_str(), args.line);
510 }
511
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
513 {
514   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
515   req_storage.remove(request);
516   // if request is null here, this may mean that a previous test has succeeded
517   // Different times in traced application and replayed version may lead to this
518   // In this case, ignore the extra calls.
519   if (request != MPI_REQUEST_NULL) {
520     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
521
522     MPI_Status status;
523     int flag = 0;
524     Request::test(&request, &status, &flag);
525
526     XBT_DEBUG("MPI_Test result: %d", flag);
527     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
528      * nullptr.*/
529     if (request == MPI_REQUEST_NULL)
530       req_storage.addNullRequest(args.src, args.dst, args.tag);
531     else
532       req_storage.add(request);
533
534     TRACE_smpi_comm_out(my_proc_id);
535   }
536 }
537
538 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
539 {
540   CHECK_ACTION_PARAMS(action, 0, 1)
541     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
542     : MPI_BYTE;  // default TAU datatype
543
544   /* start a simulated timer */
545   smpi_process()->simulated_start();
546 }
547
548 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
549 {
550   /* nothing to do */
551 }
552
553 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
554 {
555   const unsigned int count_requests = req_storage.size();
556
557   if (count_requests > 0) {
558     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
559     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
560     std::vector<MPI_Request> reqs;
561     req_storage.get_requests(reqs);
562     for (const auto& req : reqs) {
563       if (req && (req->flags() & MPI_REQ_RECV)) {
564         sender_receiver.push_back({req->src(), req->dst()});
565       }
566     }
567     MPI_Status status[count_requests];
568     Request::waitall(count_requests, &(reqs.data())[0], status);
569     req_storage.get_store().clear();
570
571     for (auto& pair : sender_receiver) {
572       TRACE_smpi_recv(pair.first, pair.second, 0);
573     }
574     TRACE_smpi_comm_out(my_proc_id);
575   }
576 }
577
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
579 {
580   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
581   Colls::barrier(MPI_COMM_WORLD);
582   TRACE_smpi_comm_out(my_proc_id);
583 }
584
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
586 {
587   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
588       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
589         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
590
591   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
592
593   TRACE_smpi_comm_out(my_proc_id);
594 }
595
596 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
597 {
598   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
599       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
600         args.comp_size, args.comm_size, -1,
601         Datatype::encode(args.datatype1), ""));
602
603   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
604       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
605   private_execute_flops(args.comp_size);
606
607   TRACE_smpi_comm_out(my_proc_id);
608 }
609
610 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
611 {
612   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
613         Datatype::encode(args.datatype1), ""));
614
615   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
616       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
617   private_execute_flops(args.comp_size);
618
619   TRACE_smpi_comm_out(my_proc_id);
620 }
621
622 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
623 {
624   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
625       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
626         Datatype::encode(args.datatype1),
627         Datatype::encode(args.datatype2)));
628
629   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
630       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
631       args.recv_size, args.datatype2, MPI_COMM_WORLD);
632
633   TRACE_smpi_comm_out(my_proc_id);
634 }
635
636 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
637 {
638   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
639         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
640
641   if (name == "gather") {
642     int rank = MPI_COMM_WORLD->rank();
643     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
644         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
645   }
646   else
647     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
648         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
649
650   TRACE_smpi_comm_out(my_proc_id);
651 }
652
653 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
654 {
655   int rank = MPI_COMM_WORLD->rank();
656
657   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
658         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
659         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
660
661   if (name == "gatherv") {
662     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
664         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
665   }
666   else {
667     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
668         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
669         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
670   }
671
672   TRACE_smpi_comm_out(my_proc_id);
673 }
674
675 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
676 {
677   int rank = MPI_COMM_WORLD->rank();
678   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
679         Datatype::encode(args.datatype1),
680         Datatype::encode(args.datatype2)));
681
682   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
684
685   TRACE_smpi_comm_out(my_proc_id);
686 }
687
688 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
689 {
690   int rank = MPI_COMM_WORLD->rank();
691   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
692         nullptr, Datatype::encode(args.datatype1),
693         Datatype::encode(args.datatype2)));
694
695   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
696       args.sendcounts->data(), args.disps.data(), args.datatype1,
697       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
698       MPI_COMM_WORLD);
699
700   TRACE_smpi_comm_out(my_proc_id);
701 }
702
703 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
704 {
705   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
706       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
707         std::to_string(args.comp_size), /* ugly hack to print comp_size */
708         Datatype::encode(args.datatype1)));
709
710   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
711       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
712       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
713
714   private_execute_flops(args.comp_size);
715   TRACE_smpi_comm_out(my_proc_id);
716 }
717
718 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
719 {
720   TRACE_smpi_comm_in(my_proc_id, __func__,
721       new simgrid::instr::VarCollTIData(
722         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
723         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
724
725   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
726       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
727
728   TRACE_smpi_comm_out(my_proc_id);
729 }
730 } // Replay Namespace
731 }} // namespace simgrid::smpi
732
733 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
734 /** @brief Only initialize the replay, don't do it for real */
735 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
736 {
737   xbt_assert(not smpi_process()->initializing());
738
739   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
740   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
741   simgrid::smpi::ActorExt::init();
742
743   smpi_process()->mark_as_initialized();
744   smpi_process()->set_replaying(true);
745
746   int my_proc_id = simgrid::s4u::this_actor::get_pid();
747
748   TRACE_smpi_init(my_proc_id);
749   TRACE_smpi_computing_init(my_proc_id);
750   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
751   TRACE_smpi_comm_out(my_proc_id);
752   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
753   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
754   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
755   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
756   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
757   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
758   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
759   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
760   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
761   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
762   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
764   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
765   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
766   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
767   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
768   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
769   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
770   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
771   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
772   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
773   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
774   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
775   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
776   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
777   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
778   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
779   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
780
781   //if we have a delayed start, sleep here.
782   if (start_delay_flops > 0) {
783     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
784     private_execute_flops(start_delay_flops);
785   } else {
786     // Wait for the other actors to initialize also
787     simgrid::s4u::this_actor::yield();
788   }
789 }
790
791 /** @brief actually run the replay after initialization */
792 void smpi_replay_main(int rank, const char* trace_filename)
793 {
794   static int active_processes = 0;
795   active_processes++;
796   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
797   std::string rank_string                      = std::to_string(rank);
798   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
799
800   /* and now, finalize everything */
801   /* One active process will stop. Decrease the counter*/
802   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
803   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
804   if (count_requests > 0) {
805     MPI_Request requests[count_requests];
806     MPI_Status status[count_requests];
807     unsigned int i=0;
808
809     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
810       requests[i] = pair.second;
811       i++;
812     }
813     simgrid::smpi::Request::waitall(count_requests, requests, status);
814   }
815   active_processes--;
816
817   if(active_processes==0){
818     /* Last process alive speaking: end the simulated timer */
819     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
820     smpi_free_replay_tmp_buffers();
821   }
822
823   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
824                      new simgrid::instr::NoOpTIData("finalize"));
825
826   smpi_process()->finalize();
827
828   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
829 }
830
831 /** @brief chain a replay initialization and a replay start */
832 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
833 {
834   smpi_replay_init(instance_id, rank, start_delay_flops);
835   smpi_replay_main(rank, trace_filename);
836 }