Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b0c8bd973feebe22dbd2acddeb8a6e3906081435
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 0)
171   time = parse_double(action[2]);
172 }
173
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 0)
177   filename = std::string(action[2]);
178   line = std::stoi(action[3]);
179 }
180
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 2)
184   size = parse_double(action[2]);
185   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186   if (action.size() > 4)
187     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
188 }
189
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 2)
193   comm_size = parse_double(action[2]);
194   comp_size = parse_double(action[3]);
195   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196   if (action.size() > 5)
197     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 }
199
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   comm_size = parse_double(action[2]);
204   comp_size = parse_double(action[3]);
205   if (action.size() > 4)
206     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
207 }
208
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = MPI_COMM_WORLD->size();
213   send_size = parse_double(action[2]);
214   recv_size = parse_double(action[3]);
215
216   if (action.size() > 4)
217     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218   if (action.size() > 5)
219     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
240     if (action.size() > 5)
241       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242     if (action.size() > 6)
243       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
244   } else {
245     if (action.size() > 4)
246       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247     if (action.size() > 5)
248       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249   }
250 }
251
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
253 {
254   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255        0 gather 68 68 10 10 10 0 0 0
256      where:
257        1) 68 is the sendcount
258        2) 68 10 10 10 is the recvcounts
259        3) 0 is the root node
260        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
262   */
263   comm_size = MPI_COMM_WORLD->size();
264   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265   send_size  = parse_double(action[2]);
266   disps      = std::vector<int>(comm_size, 0);
267   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
268
269   if (name == "gatherv") {
270     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271     if (action.size() > 4 + comm_size)
272       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273     if (action.size() > 5 + comm_size)
274       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
275   } else {
276     int disp_index     = 0;
277     /* The 3 comes from "0 gather <sendcount>", which must always be present.
278      * The + comm_size is the recvcounts array, which must also be present
279      */
280     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
281       int datatype_index = 3 + comm_size;
282       disp_index     = datatype_index + 1;
283       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
284       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285     } else if (action.size() >
286                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
287       disp_index = 3 + comm_size;
288     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
289       int datatype_index = 3 + comm_size;
290       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
291       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
292     }
293
294     if (disp_index != 0) {
295       for (unsigned int i = 0; i < comm_size; i++)
296         disps[i]          = std::stoi(action[disp_index + i]);
297     }
298   }
299
300   for (unsigned int i = 0; i < comm_size; i++) {
301     (*recvcounts)[i] = std::stoi(action[i + 3]);
302   }
303   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
304 }
305
306 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
307 {
308   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
309         0 gather 68 68 0 0 0
310       where:
311         1) 68 is the sendcounts
312         2) 68 is the recvcounts
313         3) 0 is the root node
314         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
315         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
316   */
317   CHECK_ACTION_PARAMS(action, 2, 3)
318   comm_size = MPI_COMM_WORLD->size();
319   send_size = parse_double(action[2]);
320   recv_size = parse_double(action[3]);
321   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
322   if (action.size() > 5)
323     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
324   if (action.size() > 6)
325     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
326 }
327
328 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
329 {
330   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
331      0 gather 68 10 10 10 68 0 0 0
332       where:
333       1) 68 10 10 10 is the sendcounts
334       2) 68 is the recvcount
335       3) 0 is the root node
336       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
338   */
339   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
340   recv_size  = parse_double(action[2 + comm_size]);
341   disps      = std::vector<int>(comm_size, 0);
342   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
343
344   if (action.size() > 5 + comm_size)
345     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
346   if (action.size() > 5 + comm_size)
347     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
348
349   for (unsigned int i = 0; i < comm_size; i++) {
350     (*sendcounts)[i] = std::stoi(action[i + 2]);
351   }
352   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
353   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
354 }
355
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
357 {
358   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359        0 reducescatter 275427 275427 275427 204020 11346849 0
360      where:
361        1) The first four values after the name of the action declare the recvcounts array
362        2) The value 11346849 is the amount of instructions
363        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
364   */
365   comm_size = MPI_COMM_WORLD->size();
366   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367   comp_size  = parse_double(action[2 + comm_size]);
368   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369   if (action.size() > 3 + comm_size)
370     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
371
372   for (unsigned int i = 0; i < comm_size; i++) {
373     recvcounts->push_back(std::stoi(action[i + 2]));
374   }
375   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
376 }
377
378 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 {
380   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
381         0 alltoallv 100 1 7 10 12 100 1 70 10 5
382      where:
383       1) 100 is the size of the send buffer *sizeof(int),
384       2) 1 7 10 12 is the sendcounts array
385       3) 100*sizeof(int) is the size of the receiver buffer
386       4)  1 70 10 5 is the recvcounts array
387   */
388   comm_size = MPI_COMM_WORLD->size();
389   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
390   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
391   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392   senddisps  = std::vector<int>(comm_size, 0);
393   recvdisps  = std::vector<int>(comm_size, 0);
394
395   if (action.size() > 5 + 2 * comm_size)
396     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
397   if (action.size() > 5 + 2 * comm_size)
398     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
399
400   send_buf_size = parse_double(action[2]);
401   recv_buf_size = parse_double(action[3 + comm_size]);
402   for (unsigned int i = 0; i < comm_size; i++) {
403     (*sendcounts)[i] = std::stoi(action[3 + i]);
404     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
405   }
406   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
407   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
408 }
409
410 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
411 {
412   std::string s = boost::algorithm::join(action, " ");
413   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
414   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
415   req_storage.remove(request);
416
417   if (request == MPI_REQUEST_NULL) {
418     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
419      * return.*/
420     return;
421   }
422
423   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
424
425   // Must be taken before Request::wait() since the request may be set to
426   // MPI_REQUEST_NULL by Request::wait!
427   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
428   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
429   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430
431   MPI_Status status;
432   Request::wait(&request, &status);
433
434   TRACE_smpi_comm_out(rank);
435   if (is_wait_for_receive)
436     TRACE_smpi_recv(args.src, args.dst, args.tag);
437 }
438
439 void SendAction::kernel(simgrid::xbt::ReplayAction&)
440 {
441   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
442
443   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
444         args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (name == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (name == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", name.c_str());
455   }
456
457   TRACE_smpi_comm_out(my_proc_id);
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463         args.tag, Datatype::encode(args.datatype1)));
464
465   MPI_Status status;
466   // unknown size from the receiver point of view
467   if (args.size <= 0.0) {
468     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469     args.size = status.count;
470   }
471
472   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
473   if (name == "recv") {
474     is_recv = true;
475     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476   } else if (name == "irecv") {
477     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478     req_storage.add(request);
479   } else {
480     THROW_IMPOSSIBLE;
481   }
482
483   TRACE_smpi_comm_out(my_proc_id);
484   if (is_recv && not TRACE_smpi_view_internals()) {
485     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
487   }
488 }
489
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
491 {
492   if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
493     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
494   }
495 }
496
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 {
499   XBT_DEBUG("Sleep for: %lf secs", args.time);
500   int rank = simgrid::s4u::this_actor::get_pid();
501   TRACE_smpi_sleeping_in(rank, args.time);
502   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503   TRACE_smpi_sleeping_out(rank);
504 }
505
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   smpi_trace_set_call_location(args.filename.c_str(), args.line);
509 }
510
511 void TestAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
514   req_storage.remove(request);
515   // if request is null here, this may mean that a previous test has succeeded
516   // Different times in traced application and replayed version may lead to this
517   // In this case, ignore the extra calls.
518   if (request != MPI_REQUEST_NULL) {
519     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
520
521     MPI_Status status;
522     int flag = 0;
523     Request::test(&request, &status, &flag);
524
525     XBT_DEBUG("MPI_Test result: %d", flag);
526     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
527      * nullptr.*/
528     if (request == MPI_REQUEST_NULL)
529       req_storage.addNullRequest(args.src, args.dst, args.tag);
530     else
531       req_storage.add(request);
532
533     TRACE_smpi_comm_out(my_proc_id);
534   }
535 }
536
537 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
538 {
539   CHECK_ACTION_PARAMS(action, 0, 1)
540     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
541     : MPI_BYTE;  // default TAU datatype
542
543   /* start a simulated timer */
544   smpi_process()->simulated_start();
545 }
546
547 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
548 {
549   /* nothing to do */
550 }
551
552 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
553 {
554   const unsigned int count_requests = req_storage.size();
555
556   if (count_requests > 0) {
557     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
558     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
559     std::vector<MPI_Request> reqs;
560     req_storage.get_requests(reqs);
561     for (const auto& req : reqs) {
562       if (req && (req->flags() & MPI_REQ_RECV)) {
563         sender_receiver.push_back({req->src(), req->dst()});
564       }
565     }
566     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
567     req_storage.get_store().clear();
568
569     for (auto& pair : sender_receiver) {
570       TRACE_smpi_recv(pair.first, pair.second, 0);
571     }
572     TRACE_smpi_comm_out(my_proc_id);
573   }
574 }
575
576 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
577 {
578   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
579   colls::barrier(MPI_COMM_WORLD);
580   TRACE_smpi_comm_out(my_proc_id);
581 }
582
583 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
584 {
585   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
586       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
587         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
588
589   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
590
591   TRACE_smpi_comm_out(my_proc_id);
592 }
593
594 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
595 {
596   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
597       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
598         args.comp_size, args.comm_size, -1,
599         Datatype::encode(args.datatype1), ""));
600
601   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
602                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
603                 args.root, MPI_COMM_WORLD);
604   private_execute_flops(args.comp_size);
605
606   TRACE_smpi_comm_out(my_proc_id);
607 }
608
609 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
610 {
611   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
612         Datatype::encode(args.datatype1), ""));
613
614   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
615                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
616                    MPI_COMM_WORLD);
617   private_execute_flops(args.comp_size);
618
619   TRACE_smpi_comm_out(my_proc_id);
620 }
621
622 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
623 {
624   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
625       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
626         Datatype::encode(args.datatype1),
627         Datatype::encode(args.datatype2)));
628
629   colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
630                   recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
631                   MPI_COMM_WORLD);
632
633   TRACE_smpi_comm_out(my_proc_id);
634 }
635
636 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
637 {
638   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
639         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
640
641   if (name == "gather") {
642     int rank = MPI_COMM_WORLD->rank();
643     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
644                   (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
645                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
646   }
647   else
648     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
649                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
650                      MPI_COMM_WORLD);
651
652   TRACE_smpi_comm_out(my_proc_id);
653 }
654
655 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
656 {
657   int rank = MPI_COMM_WORLD->rank();
658
659   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
660         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
661         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
662
663   if (name == "gatherv") {
664     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
665                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
666                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
667   }
668   else {
669     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
670                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
671                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
672   }
673
674   TRACE_smpi_comm_out(my_proc_id);
675 }
676
677 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
678 {
679   int rank = MPI_COMM_WORLD->rank();
680   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
681         Datatype::encode(args.datatype1),
682         Datatype::encode(args.datatype2)));
683
684   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
685                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
686                  args.datatype2, args.root, MPI_COMM_WORLD);
687
688   TRACE_smpi_comm_out(my_proc_id);
689 }
690
691 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
692 {
693   int rank = MPI_COMM_WORLD->rank();
694   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
695         nullptr, Datatype::encode(args.datatype1),
696         Datatype::encode(args.datatype2)));
697
698   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
699                   args.sendcounts->data(), args.disps.data(), args.datatype1,
700                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
701                   MPI_COMM_WORLD);
702
703   TRACE_smpi_comm_out(my_proc_id);
704 }
705
706 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
707 {
708   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
709       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
710         std::to_string(args.comp_size), /* ugly hack to print comp_size */
711         Datatype::encode(args.datatype1)));
712
713   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
714                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
715                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
716
717   private_execute_flops(args.comp_size);
718   TRACE_smpi_comm_out(my_proc_id);
719 }
720
721 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
722 {
723   TRACE_smpi_comm_in(my_proc_id, __func__,
724       new simgrid::instr::VarCollTIData(
725         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
726         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
727
728   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
729                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
730                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
731
732   TRACE_smpi_comm_out(my_proc_id);
733 }
734 } // Replay Namespace
735 }} // namespace simgrid::smpi
736
737 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
738 /** @brief Only initialize the replay, don't do it for real */
739 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
740 {
741   xbt_assert(not smpi_process()->initializing());
742
743   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
744   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
745   simgrid::smpi::ActorExt::init();
746
747   smpi_process()->mark_as_initialized();
748   smpi_process()->set_replaying(true);
749
750   int my_proc_id = simgrid::s4u::this_actor::get_pid();
751
752   TRACE_smpi_init(my_proc_id);
753   TRACE_smpi_computing_init(my_proc_id);
754   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
755   TRACE_smpi_comm_out(my_proc_id);
756   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
757   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
758   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
759   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
760   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
761   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
762   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
764   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
765   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
766   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
767   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
768   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
769   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
770   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
771   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
772   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
773   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
774   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
775   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
776   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
777   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
778   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
779   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
780   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
781   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
782   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
783   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
784
785   //if we have a delayed start, sleep here.
786   if (start_delay_flops > 0) {
787     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
788     private_execute_flops(start_delay_flops);
789   } else {
790     // Wait for the other actors to initialize also
791     simgrid::s4u::this_actor::yield();
792   }
793 }
794
795 /** @brief actually run the replay after initialization */
796 void smpi_replay_main(int rank, const char* trace_filename)
797 {
798   static int active_processes = 0;
799   active_processes++;
800   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
801   std::string rank_string                      = std::to_string(rank);
802   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
803
804   /* and now, finalize everything */
805   /* One active process will stop. Decrease the counter*/
806   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
807   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
808   if (count_requests > 0) {
809     MPI_Request* requests= new MPI_Request[count_requests];
810     unsigned int i=0;
811
812     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
813       requests[i] = pair.second;
814       i++;
815     }
816     simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
817     delete[] requests;
818   }
819   active_processes--;
820
821   if(active_processes==0){
822     /* Last process alive speaking: end the simulated timer */
823     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
824     smpi_free_replay_tmp_buffers();
825   }
826
827   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
828                      new simgrid::instr::NoOpTIData("finalize"));
829
830   smpi_process()->finalize();
831
832   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
833 }
834
835 /** @brief chain a replay initialization and a replay start */
836 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
837 {
838   smpi_replay_init(instance_id, rank, start_delay_flops);
839   smpi_replay_main(rank, trace_filename);
840 }