Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
04997ef74d7628f1bc605c8df128b31f676cc7cf
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 0)
171   time = parse_double(action[2]);
172 }
173
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 0)
177   filename = std::string(action[2]);
178   line = std::stoi(action[3]);
179 }
180
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 2)
184   size = parse_double(action[2]);
185   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186   if (action.size() > 4)
187     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
188 }
189
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 2)
193   comm_size = parse_double(action[2]);
194   comp_size = parse_double(action[3]);
195   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196   if (action.size() > 5)
197     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 }
199
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   comm_size = parse_double(action[2]);
204   comp_size = parse_double(action[3]);
205   if (action.size() > 4)
206     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
207 }
208
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = MPI_COMM_WORLD->size();
213   send_size = parse_double(action[2]);
214   recv_size = parse_double(action[3]);
215
216   if (action.size() > 4)
217     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218   if (action.size() > 5)
219     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
240     if (action.size() > 5)
241       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242     if (action.size() > 6)
243       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
244   } else {
245     if (action.size() > 4)
246       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247     if (action.size() > 5)
248       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249   }
250 }
251
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
253 {
254   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255        0 gather 68 68 10 10 10 0 0 0
256      where:
257        1) 68 is the sendcount
258        2) 68 10 10 10 is the recvcounts
259        3) 0 is the root node
260        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
262   */
263   comm_size = MPI_COMM_WORLD->size();
264   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265   send_size  = parse_double(action[2]);
266   disps      = std::vector<int>(comm_size, 0);
267   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
268
269   if (name == "gatherv") {
270     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271     if (action.size() > 4 + comm_size)
272       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273     if (action.size() > 5 + comm_size)
274       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
275   } else {
276     int datatype_index = 0;
277     int disp_index     = 0;
278     /* The 3 comes from "0 gather <sendcount>", which must always be present.
279      * The + comm_size is the recvcounts array, which must also be present
280      */
281     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
282       datatype_index = 3 + comm_size;
283       disp_index     = datatype_index + 1;
284       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
286     } else if (action.size() >
287                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
288       disp_index = 3 + comm_size;
289     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
290       datatype_index = 3 + comm_size;
291       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
292       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
293     }
294
295     if (disp_index != 0) {
296       for (unsigned int i = 0; i < comm_size; i++)
297         disps[i]          = std::stoi(action[disp_index + i]);
298     }
299   }
300
301   for (unsigned int i = 0; i < comm_size; i++) {
302     (*recvcounts)[i] = std::stoi(action[i + 3]);
303   }
304   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
305 }
306
307 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
308 {
309   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
310         0 gather 68 68 0 0 0
311       where:
312         1) 68 is the sendcounts
313         2) 68 is the recvcounts
314         3) 0 is the root node
315         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
316         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
317   */
318   CHECK_ACTION_PARAMS(action, 2, 3)
319   comm_size = MPI_COMM_WORLD->size();
320   send_size = parse_double(action[2]);
321   recv_size = parse_double(action[3]);
322   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
323   if (action.size() > 5)
324     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
325   if (action.size() > 6)
326     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
327 }
328
329 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
330 {
331   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
332      0 gather 68 10 10 10 68 0 0 0
333       where:
334       1) 68 10 10 10 is the sendcounts
335       2) 68 is the recvcount
336       3) 0 is the root node
337       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
338       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
339   */
340   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
341   recv_size  = parse_double(action[2 + comm_size]);
342   disps      = std::vector<int>(comm_size, 0);
343   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
344
345   if (action.size() > 5 + comm_size)
346     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
347   if (action.size() > 5 + comm_size)
348     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
349
350   for (unsigned int i = 0; i < comm_size; i++) {
351     (*sendcounts)[i] = std::stoi(action[i + 2]);
352   }
353   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
354   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
355 }
356
357 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 {
359   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
360        0 reducescatter 275427 275427 275427 204020 11346849 0
361      where:
362        1) The first four values after the name of the action declare the recvcounts array
363        2) The value 11346849 is the amount of instructions
364        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365   */
366   comm_size = MPI_COMM_WORLD->size();
367   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
368   comp_size  = parse_double(action[2 + comm_size]);
369   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
370   if (action.size() > 3 + comm_size)
371     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
372
373   for (unsigned int i = 0; i < comm_size; i++) {
374     recvcounts->push_back(std::stoi(action[i + 2]));
375   }
376   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
377 }
378
379 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 {
381   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
382         0 alltoallv 100 1 7 10 12 100 1 70 10 5
383      where:
384       1) 100 is the size of the send buffer *sizeof(int),
385       2) 1 7 10 12 is the sendcounts array
386       3) 100*sizeof(int) is the size of the receiver buffer
387       4)  1 70 10 5 is the recvcounts array
388   */
389   comm_size = MPI_COMM_WORLD->size();
390   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
391   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
393   senddisps  = std::vector<int>(comm_size, 0);
394   recvdisps  = std::vector<int>(comm_size, 0);
395
396   if (action.size() > 5 + 2 * comm_size)
397     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
398   if (action.size() > 5 + 2 * comm_size)
399     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
400
401   send_buf_size = parse_double(action[2]);
402   recv_buf_size = parse_double(action[3 + comm_size]);
403   for (unsigned int i = 0; i < comm_size; i++) {
404     (*sendcounts)[i] = std::stoi(action[3 + i]);
405     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
406   }
407   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
408   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
409 }
410
411 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
412 {
413   std::string s = boost::algorithm::join(action, " ");
414   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
415   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
416   req_storage.remove(request);
417
418   if (request == MPI_REQUEST_NULL) {
419     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
420      * return.*/
421     return;
422   }
423
424   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
425
426   // Must be taken before Request::wait() since the request may be set to
427   // MPI_REQUEST_NULL by Request::wait!
428   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
429   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
430   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
431
432   MPI_Status status;
433   Request::wait(&request, &status);
434
435   TRACE_smpi_comm_out(rank);
436   if (is_wait_for_receive)
437     TRACE_smpi_recv(args.src, args.dst, args.tag);
438 }
439
440 void SendAction::kernel(simgrid::xbt::ReplayAction&)
441 {
442   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
443
444   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
445         args.tag, Datatype::encode(args.datatype1)));
446   if (not TRACE_smpi_view_internals())
447     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
448
449   if (name == "send") {
450     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
451   } else if (name == "isend") {
452     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
453     req_storage.add(request);
454   } else {
455     xbt_die("Don't know this action, %s", name.c_str());
456   }
457
458   TRACE_smpi_comm_out(my_proc_id);
459 }
460
461 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
462 {
463   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
464         args.tag, Datatype::encode(args.datatype1)));
465
466   MPI_Status status;
467   // unknown size from the receiver point of view
468   if (args.size <= 0.0) {
469     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
470     args.size = status.count;
471   }
472
473   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
474   if (name == "recv") {
475     is_recv = true;
476     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
477   } else if (name == "irecv") {
478     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
479     req_storage.add(request);
480   } else {
481     THROW_IMPOSSIBLE;
482   }
483
484   TRACE_smpi_comm_out(my_proc_id);
485   if (is_recv && not TRACE_smpi_view_internals()) {
486     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
487     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
488   }
489 }
490
491 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
492 {
493   smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
494
495 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
496 {
497   XBT_DEBUG("Sleep for: %lf secs", args.time);
498   int rank = simgrid::s4u::this_actor::get_pid();
499   TRACE_smpi_sleeping_in(rank, args.time);
500   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
501   TRACE_smpi_sleeping_out(rank);
502 }
503
504 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
505 {
506   smpi_trace_set_call_location(args.filename.c_str(), args.line);
507 }
508
509 void TestAction::kernel(simgrid::xbt::ReplayAction&)
510 {
511   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
512   req_storage.remove(request);
513   // if request is null here, this may mean that a previous test has succeeded
514   // Different times in traced application and replayed version may lead to this
515   // In this case, ignore the extra calls.
516   if (request != MPI_REQUEST_NULL) {
517     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
518
519     MPI_Status status;
520     int flag = 0;
521     Request::test(&request, &status, &flag);
522
523     XBT_DEBUG("MPI_Test result: %d", flag);
524     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
525      * nullptr.*/
526     if (request == MPI_REQUEST_NULL)
527       req_storage.addNullRequest(args.src, args.dst, args.tag);
528     else
529       req_storage.add(request);
530
531     TRACE_smpi_comm_out(my_proc_id);
532   }
533 }
534
535 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
536 {
537   CHECK_ACTION_PARAMS(action, 0, 1)
538     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
539     : MPI_BYTE;  // default TAU datatype
540
541   /* start a simulated timer */
542   smpi_process()->simulated_start();
543 }
544
545 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
546 {
547   /* nothing to do */
548 }
549
550 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
551 {
552   const unsigned int count_requests = req_storage.size();
553
554   if (count_requests > 0) {
555     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
556     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
557     std::vector<MPI_Request> reqs;
558     req_storage.get_requests(reqs);
559     for (const auto& req : reqs) {
560       if (req && (req->flags() & MPI_REQ_RECV)) {
561         sender_receiver.push_back({req->src(), req->dst()});
562       }
563     }
564     MPI_Status status[count_requests];
565     Request::waitall(count_requests, &(reqs.data())[0], status);
566     req_storage.get_store().clear();
567
568     for (auto& pair : sender_receiver) {
569       TRACE_smpi_recv(pair.first, pair.second, 0);
570     }
571     TRACE_smpi_comm_out(my_proc_id);
572   }
573 }
574
575 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
576 {
577   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
578   Colls::barrier(MPI_COMM_WORLD);
579   TRACE_smpi_comm_out(my_proc_id);
580 }
581
582 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
583 {
584   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
585       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
586         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
587
588   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
589
590   TRACE_smpi_comm_out(my_proc_id);
591 }
592
593 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
594 {
595   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
596       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
597         args.comp_size, args.comm_size, -1,
598         Datatype::encode(args.datatype1), ""));
599
600   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
601       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
602   private_execute_flops(args.comp_size);
603
604   TRACE_smpi_comm_out(my_proc_id);
605 }
606
607 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
608 {
609   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
610         Datatype::encode(args.datatype1), ""));
611
612   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
613       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
614   private_execute_flops(args.comp_size);
615
616   TRACE_smpi_comm_out(my_proc_id);
617 }
618
619 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
620 {
621   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
622       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
623         Datatype::encode(args.datatype1),
624         Datatype::encode(args.datatype2)));
625
626   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
627       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
628       args.recv_size, args.datatype2, MPI_COMM_WORLD);
629
630   TRACE_smpi_comm_out(my_proc_id);
631 }
632
633 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
634 {
635   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
636         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
637
638   if (name == "gather") {
639     int rank = MPI_COMM_WORLD->rank();
640     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
641         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
642   }
643   else
644     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
645         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
646
647   TRACE_smpi_comm_out(my_proc_id);
648 }
649
650 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
651 {
652   int rank = MPI_COMM_WORLD->rank();
653
654   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
655         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
656         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
657
658   if (name == "gatherv") {
659     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
660         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
661         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
662   }
663   else {
664     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
665         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
666         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
667   }
668
669   TRACE_smpi_comm_out(my_proc_id);
670 }
671
672 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
673 {
674   int rank = MPI_COMM_WORLD->rank();
675   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
676         Datatype::encode(args.datatype1),
677         Datatype::encode(args.datatype2)));
678
679   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
680       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
681
682   TRACE_smpi_comm_out(my_proc_id);
683 }
684
685 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
686 {
687   int rank = MPI_COMM_WORLD->rank();
688   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
689         nullptr, Datatype::encode(args.datatype1),
690         Datatype::encode(args.datatype2)));
691
692   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
693       args.sendcounts->data(), args.disps.data(), args.datatype1,
694       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
695       MPI_COMM_WORLD);
696
697   TRACE_smpi_comm_out(my_proc_id);
698 }
699
700 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
701 {
702   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
703       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
704         std::to_string(args.comp_size), /* ugly hack to print comp_size */
705         Datatype::encode(args.datatype1)));
706
707   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
708       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
709       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
710
711   private_execute_flops(args.comp_size);
712   TRACE_smpi_comm_out(my_proc_id);
713 }
714
715 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
716 {
717   TRACE_smpi_comm_in(my_proc_id, __func__,
718       new simgrid::instr::VarCollTIData(
719         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
720         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
721
722   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
723       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
724
725   TRACE_smpi_comm_out(my_proc_id);
726 }
727 } // Replay Namespace
728 }} // namespace simgrid::smpi
729
730 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
731 /** @brief Only initialize the replay, don't do it for real */
732 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
733 {
734   xbt_assert(not smpi_process()->initializing());
735
736   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
737   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
738   simgrid::smpi::ActorExt::init();
739
740   smpi_process()->mark_as_initialized();
741   smpi_process()->set_replaying(true);
742
743   int my_proc_id = simgrid::s4u::this_actor::get_pid();
744
745   TRACE_smpi_init(my_proc_id);
746   TRACE_smpi_computing_init(my_proc_id);
747   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
748   TRACE_smpi_comm_out(my_proc_id);
749   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
750   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
751   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
752   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
753   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
754   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
755   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
756   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
757   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
758   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
759   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
760   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
761   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
762   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
763   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
764   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
765   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
766   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
767   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
768   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
769   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
770   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
771   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
772   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
773   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
774   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
775   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
776   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
777
778   //if we have a delayed start, sleep here.
779   if (start_delay_flops > 0) {
780     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
781     private_execute_flops(start_delay_flops);
782   } else {
783     // Wait for the other actors to initialize also
784     simgrid::s4u::this_actor::yield();
785   }
786 }
787
788 /** @brief actually run the replay after initialization */
789 void smpi_replay_main(int rank, const char* trace_filename)
790 {
791   static int active_processes = 0;
792   active_processes++;
793   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
794   std::string rank_string                      = std::to_string(rank);
795   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
796
797   /* and now, finalize everything */
798   /* One active process will stop. Decrease the counter*/
799   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
800   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
801   if (count_requests > 0) {
802     MPI_Request requests[count_requests];
803     MPI_Status status[count_requests];
804     unsigned int i=0;
805
806     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
807       requests[i] = pair.second;
808       i++;
809     }
810     simgrid::smpi::Request::waitall(count_requests, requests, status);
811   }
812   active_processes--;
813
814   if(active_processes==0){
815     /* Last process alive speaking: end the simulated timer */
816     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
817     smpi_free_replay_tmp_buffers();
818   }
819
820   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
821                      new simgrid::instr::NoOpTIData("finalize"));
822
823   smpi_process()->finalize();
824
825   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
826 }
827
828 /** @brief chain a replay initialization and a replay start */
829 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
830 {
831   smpi_replay_init(instance_id, rank, start_delay_flops);
832   smpi_replay_main(rank, trace_filename);
833 }