Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix use after delete.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 0)
171   time = parse_double(action[2]);
172 }
173
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 0)
177   filename = std::string(action[2]);
178   line = std::stoi(action[3]);
179 }
180
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 2)
184   size = parse_double(action[2]);
185   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186   if (action.size() > 4)
187     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
188 }
189
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 2)
193   comm_size = parse_double(action[2]);
194   comp_size = parse_double(action[3]);
195   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196   if (action.size() > 5)
197     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 }
199
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   comm_size = parse_double(action[2]);
204   comp_size = parse_double(action[3]);
205   if (action.size() > 4)
206     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
207 }
208
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = MPI_COMM_WORLD->size();
213   send_size = parse_double(action[2]);
214   recv_size = parse_double(action[3]);
215
216   if (action.size() > 4)
217     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218   if (action.size() > 5)
219     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
240     if (action.size() > 5)
241       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242     if (action.size() > 6)
243       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
244   } else {
245     if (action.size() > 4)
246       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247     if (action.size() > 5)
248       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249   }
250 }
251
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
253 {
254   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255        0 gather 68 68 10 10 10 0 0 0
256      where:
257        1) 68 is the sendcount
258        2) 68 10 10 10 is the recvcounts
259        3) 0 is the root node
260        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
262   */
263   comm_size = MPI_COMM_WORLD->size();
264   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265   send_size  = parse_double(action[2]);
266   disps      = std::vector<int>(comm_size, 0);
267   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
268
269   if (name == "gatherv") {
270     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271     if (action.size() > 4 + comm_size)
272       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273     if (action.size() > 5 + comm_size)
274       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
275   } else {
276     int disp_index     = 0;
277     /* The 3 comes from "0 gather <sendcount>", which must always be present.
278      * The + comm_size is the recvcounts array, which must also be present
279      */
280     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
281       int datatype_index = 3 + comm_size;
282       disp_index     = datatype_index + 1;
283       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
284       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285     } else if (action.size() >
286                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
287       disp_index = 3 + comm_size;
288     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
289       int datatype_index = 3 + comm_size;
290       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
291       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
292     }
293
294     if (disp_index != 0) {
295       for (unsigned int i = 0; i < comm_size; i++)
296         disps[i]          = std::stoi(action[disp_index + i]);
297     }
298   }
299
300   for (unsigned int i = 0; i < comm_size; i++) {
301     (*recvcounts)[i] = std::stoi(action[i + 3]);
302   }
303   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
304 }
305
306 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
307 {
308   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
309         0 gather 68 68 0 0 0
310       where:
311         1) 68 is the sendcounts
312         2) 68 is the recvcounts
313         3) 0 is the root node
314         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
315         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
316   */
317   CHECK_ACTION_PARAMS(action, 2, 3)
318   comm_size = MPI_COMM_WORLD->size();
319   send_size = parse_double(action[2]);
320   recv_size = parse_double(action[3]);
321   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
322   if (action.size() > 5)
323     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
324   if (action.size() > 6)
325     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
326 }
327
328 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
329 {
330   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
331      0 gather 68 10 10 10 68 0 0 0
332       where:
333       1) 68 10 10 10 is the sendcounts
334       2) 68 is the recvcount
335       3) 0 is the root node
336       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
338   */
339   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
340   recv_size  = parse_double(action[2 + comm_size]);
341   disps      = std::vector<int>(comm_size, 0);
342   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
343
344   if (action.size() > 5 + comm_size)
345     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
346   if (action.size() > 5 + comm_size)
347     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
348
349   for (unsigned int i = 0; i < comm_size; i++) {
350     (*sendcounts)[i] = std::stoi(action[i + 2]);
351   }
352   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
353   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
354 }
355
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
357 {
358   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359        0 reducescatter 275427 275427 275427 204020 11346849 0
360      where:
361        1) The first four values after the name of the action declare the recvcounts array
362        2) The value 11346849 is the amount of instructions
363        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
364   */
365   comm_size = MPI_COMM_WORLD->size();
366   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367   comp_size  = parse_double(action[2 + comm_size]);
368   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369   if (action.size() > 3 + comm_size)
370     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
371
372   for (unsigned int i = 0; i < comm_size; i++) {
373     recvcounts->push_back(std::stoi(action[i + 2]));
374   }
375   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
376 }
377
378 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 {
380   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
381         0 alltoallv 100 1 7 10 12 100 1 70 10 5
382      where:
383       1) 100 is the size of the send buffer *sizeof(int),
384       2) 1 7 10 12 is the sendcounts array
385       3) 100*sizeof(int) is the size of the receiver buffer
386       4)  1 70 10 5 is the recvcounts array
387   */
388   comm_size = MPI_COMM_WORLD->size();
389   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
390   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
391   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392   senddisps  = std::vector<int>(comm_size, 0);
393   recvdisps  = std::vector<int>(comm_size, 0);
394
395   if (action.size() > 5 + 2 * comm_size)
396     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
397   if (action.size() > 5 + 2 * comm_size)
398     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
399
400   send_buf_size = parse_double(action[2]);
401   recv_buf_size = parse_double(action[3 + comm_size]);
402   for (unsigned int i = 0; i < comm_size; i++) {
403     (*sendcounts)[i] = std::stoi(action[3 + i]);
404     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
405   }
406   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
407   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
408 }
409
410 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
411 {
412   std::string s = boost::algorithm::join(action, " ");
413   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
414   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
415   req_storage.remove(request);
416
417   if (request == MPI_REQUEST_NULL) {
418     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
419      * return.*/
420     return;
421   }
422
423   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
424
425   // Must be taken before Request::wait() since the request may be set to
426   // MPI_REQUEST_NULL by Request::wait!
427   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
428   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
429   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430
431   MPI_Status status;
432   Request::wait(&request, &status);
433
434   TRACE_smpi_comm_out(rank);
435   if (is_wait_for_receive)
436     TRACE_smpi_recv(args.src, args.dst, args.tag);
437 }
438
439 void SendAction::kernel(simgrid::xbt::ReplayAction&)
440 {
441   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
442
443   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
444         args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (name == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (name == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", name.c_str());
455   }
456
457   TRACE_smpi_comm_out(my_proc_id);
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463         args.tag, Datatype::encode(args.datatype1)));
464
465   MPI_Status status;
466   // unknown size from the receiver point of view
467   if (args.size <= 0.0) {
468     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469     args.size = status.count;
470   }
471
472   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
473   if (name == "recv") {
474     is_recv = true;
475     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476   } else if (name == "irecv") {
477     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478     req_storage.add(request);
479   } else {
480     THROW_IMPOSSIBLE;
481   }
482
483   TRACE_smpi_comm_out(my_proc_id);
484   if (is_recv && not TRACE_smpi_view_internals()) {
485     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
487   }
488 }
489
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
491 {
492   if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
493     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
494   }
495 }
496
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 {
499   XBT_DEBUG("Sleep for: %lf secs", args.time);
500   int rank = simgrid::s4u::this_actor::get_pid();
501   TRACE_smpi_sleeping_in(rank, args.time);
502   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503   TRACE_smpi_sleeping_out(rank);
504 }
505
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   smpi_trace_set_call_location(args.filename.c_str(), args.line);
509 }
510
511 void TestAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
514   req_storage.remove(request);
515   // if request is null here, this may mean that a previous test has succeeded
516   // Different times in traced application and replayed version may lead to this
517   // In this case, ignore the extra calls.
518   if (request != MPI_REQUEST_NULL) {
519     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
520
521     MPI_Status status;
522     int flag = 0;
523     Request::test(&request, &status, &flag);
524
525     XBT_DEBUG("MPI_Test result: %d", flag);
526     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
527      * nullptr.*/
528     if (request == MPI_REQUEST_NULL)
529       req_storage.addNullRequest(args.src, args.dst, args.tag);
530     else
531       req_storage.add(request);
532
533     TRACE_smpi_comm_out(my_proc_id);
534   }
535 }
536
537 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
538 {
539   CHECK_ACTION_PARAMS(action, 0, 1)
540     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
541     : MPI_BYTE;  // default TAU datatype
542
543   /* start a simulated timer */
544   smpi_process()->simulated_start();
545 }
546
547 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
548 {
549   /* nothing to do */
550 }
551
552 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
553 {
554   const unsigned int count_requests = req_storage.size();
555
556   if (count_requests > 0) {
557     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
558     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
559     std::vector<MPI_Request> reqs;
560     req_storage.get_requests(reqs);
561     for (const auto& req : reqs) {
562       if (req && (req->flags() & MPI_REQ_RECV)) {
563         sender_receiver.push_back({req->src(), req->dst()});
564       }
565     }
566     MPI_Status status[count_requests];
567     Request::waitall(count_requests, &(reqs.data())[0], status);
568     req_storage.get_store().clear();
569
570     for (auto& pair : sender_receiver) {
571       TRACE_smpi_recv(pair.first, pair.second, 0);
572     }
573     TRACE_smpi_comm_out(my_proc_id);
574   }
575 }
576
577 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
578 {
579   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
580   colls::barrier(MPI_COMM_WORLD);
581   TRACE_smpi_comm_out(my_proc_id);
582 }
583
584 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
585 {
586   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
587       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
588         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
589
590   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
591
592   TRACE_smpi_comm_out(my_proc_id);
593 }
594
595 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
596 {
597   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
598       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
599         args.comp_size, args.comm_size, -1,
600         Datatype::encode(args.datatype1), ""));
601
602   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
603                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
604                 args.root, MPI_COMM_WORLD);
605   private_execute_flops(args.comp_size);
606
607   TRACE_smpi_comm_out(my_proc_id);
608 }
609
610 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
611 {
612   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
613         Datatype::encode(args.datatype1), ""));
614
615   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
616                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
617                    MPI_COMM_WORLD);
618   private_execute_flops(args.comp_size);
619
620   TRACE_smpi_comm_out(my_proc_id);
621 }
622
623 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
624 {
625   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
626       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
627         Datatype::encode(args.datatype1),
628         Datatype::encode(args.datatype2)));
629
630   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
631                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
632                   MPI_COMM_WORLD);
633
634   TRACE_smpi_comm_out(my_proc_id);
635 }
636
637 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
638 {
639   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
640         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
641
642   if (name == "gather") {
643     int rank = MPI_COMM_WORLD->rank();
644     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
645                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
646                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
647   }
648   else
649     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
650                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
651                      MPI_COMM_WORLD);
652
653   TRACE_smpi_comm_out(my_proc_id);
654 }
655
656 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
657 {
658   int rank = MPI_COMM_WORLD->rank();
659
660   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
661         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
662         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
663
664   if (name == "gatherv") {
665     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
666                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
667                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
668   }
669   else {
670     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
671                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
672                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
673   }
674
675   TRACE_smpi_comm_out(my_proc_id);
676 }
677
678 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
679 {
680   int rank = MPI_COMM_WORLD->rank();
681   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
682         Datatype::encode(args.datatype1),
683         Datatype::encode(args.datatype2)));
684
685   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
686                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
687                  args.datatype2, args.root, MPI_COMM_WORLD);
688
689   TRACE_smpi_comm_out(my_proc_id);
690 }
691
692 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
693 {
694   int rank = MPI_COMM_WORLD->rank();
695   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
696         nullptr, Datatype::encode(args.datatype1),
697         Datatype::encode(args.datatype2)));
698
699   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
700                   args.sendcounts->data(), args.disps.data(), args.datatype1,
701                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
702                   MPI_COMM_WORLD);
703
704   TRACE_smpi_comm_out(my_proc_id);
705 }
706
707 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
708 {
709   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
710       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
711         std::to_string(args.comp_size), /* ugly hack to print comp_size */
712         Datatype::encode(args.datatype1)));
713
714   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
715                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
716                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
717
718   private_execute_flops(args.comp_size);
719   TRACE_smpi_comm_out(my_proc_id);
720 }
721
722 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
723 {
724   TRACE_smpi_comm_in(my_proc_id, __func__,
725       new simgrid::instr::VarCollTIData(
726         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
727         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
728
729   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
730                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
731                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
732
733   TRACE_smpi_comm_out(my_proc_id);
734 }
735 } // Replay Namespace
736 }} // namespace simgrid::smpi
737
738 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
739 /** @brief Only initialize the replay, don't do it for real */
740 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
741 {
742   xbt_assert(not smpi_process()->initializing());
743
744   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
745   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
746   simgrid::smpi::ActorExt::init();
747
748   smpi_process()->mark_as_initialized();
749   smpi_process()->set_replaying(true);
750
751   int my_proc_id = simgrid::s4u::this_actor::get_pid();
752
753   TRACE_smpi_init(my_proc_id);
754   TRACE_smpi_computing_init(my_proc_id);
755   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
756   TRACE_smpi_comm_out(my_proc_id);
757   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
758   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
759   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
760   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
761   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
762   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
764   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
765   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
766   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
767   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
768   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
769   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
770   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
771   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
772   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
773   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
774   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
775   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
776   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
777   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
778   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
779   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
780   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
781   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
782   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
783   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
784   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
785
786   //if we have a delayed start, sleep here.
787   if (start_delay_flops > 0) {
788     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
789     private_execute_flops(start_delay_flops);
790   } else {
791     // Wait for the other actors to initialize also
792     simgrid::s4u::this_actor::yield();
793   }
794 }
795
796 /** @brief actually run the replay after initialization */
797 void smpi_replay_main(int rank, const char* trace_filename)
798 {
799   static int active_processes = 0;
800   active_processes++;
801   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
802   std::string rank_string                      = std::to_string(rank);
803   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
804
805   /* and now, finalize everything */
806   /* One active process will stop. Decrease the counter*/
807   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
808   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
809   if (count_requests > 0) {
810     MPI_Request requests[count_requests];
811     MPI_Status status[count_requests];
812     unsigned int i=0;
813
814     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
815       requests[i] = pair.second;
816       i++;
817     }
818     simgrid::smpi::Request::waitall(count_requests, requests, status);
819   }
820   active_processes--;
821
822   if(active_processes==0){
823     /* Last process alive speaking: end the simulated timer */
824     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
825     smpi_free_replay_tmp_buffers();
826   }
827
828   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
829                      new simgrid::instr::NoOpTIData("finalize"));
830
831   smpi_process()->finalize();
832
833   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
834 }
835
836 /** @brief chain a replay initialization and a replay start */
837 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
838 {
839   smpi_replay_init(instance_id, rank, start_delay_flops);
840   smpi_replay_main(rank, trace_filename);
841 }