Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into xbt_random
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 0)
171   time = parse_double(action[2]);
172 }
173
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 0)
177   filename = std::string(action[2]);
178   line = std::stoi(action[3]);
179 }
180
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
182 {
183   CHECK_ACTION_PARAMS(action, 1, 2)
184   size = parse_double(action[2]);
185   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186   if (action.size() > 4)
187     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
188 }
189
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 2)
193   comm_size = parse_double(action[2]);
194   comp_size = parse_double(action[3]);
195   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
196   if (action.size() > 5)
197     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
198 }
199
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   comm_size = parse_double(action[2]);
204   comp_size = parse_double(action[3]);
205   if (action.size() > 4)
206     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
207 }
208
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = MPI_COMM_WORLD->size();
213   send_size = parse_double(action[2]);
214   recv_size = parse_double(action[3]);
215
216   if (action.size() > 4)
217     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218   if (action.size() > 5)
219     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
220 }
221
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
223 {
224   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
225         0 gather 68 68 0 0 0
226       where:
227         1) 68 is the sendcounts
228         2) 68 is the recvcounts
229         3) 0 is the root node
230         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
232   */
233   CHECK_ACTION_PARAMS(action, 2, 3)
234   comm_size = MPI_COMM_WORLD->size();
235   send_size = parse_double(action[2]);
236   recv_size = parse_double(action[3]);
237
238   if (name == "gather") {
239     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
240     if (action.size() > 5)
241       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242     if (action.size() > 6)
243       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
244   } else {
245     if (action.size() > 4)
246       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247     if (action.size() > 5)
248       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
249   }
250 }
251
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
253 {
254   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255        0 gather 68 68 10 10 10 0 0 0
256      where:
257        1) 68 is the sendcount
258        2) 68 10 10 10 is the recvcounts
259        3) 0 is the root node
260        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
262   */
263   comm_size = MPI_COMM_WORLD->size();
264   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265   send_size  = parse_double(action[2]);
266   disps      = std::vector<int>(comm_size, 0);
267   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
268
269   if (name == "gatherv") {
270     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271     if (action.size() > 4 + comm_size)
272       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273     if (action.size() > 5 + comm_size)
274       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
275   } else {
276     int disp_index     = 0;
277     /* The 3 comes from "0 gather <sendcount>", which must always be present.
278      * The + comm_size is the recvcounts array, which must also be present
279      */
280     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
281       int datatype_index = 3 + comm_size;
282       disp_index     = datatype_index + 1;
283       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
284       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
285     } else if (action.size() >
286                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
287       disp_index = 3 + comm_size;
288     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
289       int datatype_index = 3 + comm_size;
290       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
291       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
292     }
293
294     if (disp_index != 0) {
295       for (unsigned int i = 0; i < comm_size; i++)
296         disps[i]          = std::stoi(action[disp_index + i]);
297     }
298   }
299
300   for (unsigned int i = 0; i < comm_size; i++) {
301     (*recvcounts)[i] = std::stoi(action[i + 3]);
302   }
303   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
304 }
305
306 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
307 {
308   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
309         0 gather 68 68 0 0 0
310       where:
311         1) 68 is the sendcounts
312         2) 68 is the recvcounts
313         3) 0 is the root node
314         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
315         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
316   */
317   CHECK_ACTION_PARAMS(action, 2, 3)
318   comm_size = MPI_COMM_WORLD->size();
319   send_size = parse_double(action[2]);
320   recv_size = parse_double(action[3]);
321   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
322   if (action.size() > 5)
323     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
324   if (action.size() > 6)
325     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
326 }
327
328 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
329 {
330   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
331      0 gather 68 10 10 10 68 0 0 0
332       where:
333       1) 68 10 10 10 is the sendcounts
334       2) 68 is the recvcount
335       3) 0 is the root node
336       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
338   */
339   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
340   recv_size  = parse_double(action[2 + comm_size]);
341   disps      = std::vector<int>(comm_size, 0);
342   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
343
344   if (action.size() > 5 + comm_size)
345     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
346   if (action.size() > 5 + comm_size)
347     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
348
349   for (unsigned int i = 0; i < comm_size; i++) {
350     (*sendcounts)[i] = std::stoi(action[i + 2]);
351   }
352   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
353   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
354 }
355
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
357 {
358   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359        0 reducescatter 275427 275427 275427 204020 11346849 0
360      where:
361        1) The first four values after the name of the action declare the recvcounts array
362        2) The value 11346849 is the amount of instructions
363        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
364   */
365   comm_size = MPI_COMM_WORLD->size();
366   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367   comp_size  = parse_double(action[2 + comm_size]);
368   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369   if (action.size() > 3 + comm_size)
370     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
371
372   for (unsigned int i = 0; i < comm_size; i++) {
373     recvcounts->push_back(std::stoi(action[i + 2]));
374   }
375   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
376 }
377
378 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
379 {
380   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
381         0 alltoallv 100 1 7 10 12 100 1 70 10 5
382      where:
383       1) 100 is the size of the send buffer *sizeof(int),
384       2) 1 7 10 12 is the sendcounts array
385       3) 100*sizeof(int) is the size of the receiver buffer
386       4)  1 70 10 5 is the recvcounts array
387   */
388   comm_size = MPI_COMM_WORLD->size();
389   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
390   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
391   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392   senddisps  = std::vector<int>(comm_size, 0);
393   recvdisps  = std::vector<int>(comm_size, 0);
394
395   if (action.size() > 5 + 2 * comm_size)
396     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
397   if (action.size() > 5 + 2 * comm_size)
398     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
399
400   send_buf_size = parse_double(action[2]);
401   recv_buf_size = parse_double(action[3 + comm_size]);
402   for (unsigned int i = 0; i < comm_size; i++) {
403     (*sendcounts)[i] = std::stoi(action[3 + i]);
404     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
405   }
406   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
407   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
408 }
409
410 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
411 {
412   std::string s = boost::algorithm::join(action, " ");
413   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
414   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
415   req_storage.remove(request);
416
417   if (request == MPI_REQUEST_NULL) {
418     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
419      * return.*/
420     return;
421   }
422
423   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
424
425   // Must be taken before Request::wait() since the request may be set to
426   // MPI_REQUEST_NULL by Request::wait!
427   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
428   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
429   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
430
431   MPI_Status status;
432   Request::wait(&request, &status);
433
434   TRACE_smpi_comm_out(rank);
435   if (is_wait_for_receive)
436     TRACE_smpi_recv(args.src, args.dst, args.tag);
437 }
438
439 void SendAction::kernel(simgrid::xbt::ReplayAction&)
440 {
441   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
442
443   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
444         args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (name == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (name == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", name.c_str());
455   }
456
457   TRACE_smpi_comm_out(my_proc_id);
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463         args.tag, Datatype::encode(args.datatype1)));
464
465   MPI_Status status;
466   // unknown size from the receiver point of view
467   if (args.size <= 0.0) {
468     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469     args.size = status.count;
470   }
471
472   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
473   if (name == "recv") {
474     is_recv = true;
475     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
476   } else if (name == "irecv") {
477     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
478     req_storage.add(request);
479   } else {
480     THROW_IMPOSSIBLE;
481   }
482
483   TRACE_smpi_comm_out(my_proc_id);
484   if (is_recv && not TRACE_smpi_view_internals()) {
485     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
486     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
487   }
488 }
489
490 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
491 {
492   if (simgrid::config::get_value<bool>("smpi/simulate-computation")) {
493     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
494   }
495 }
496
497 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 {
499   XBT_DEBUG("Sleep for: %lf secs", args.time);
500   int rank = simgrid::s4u::this_actor::get_pid();
501   TRACE_smpi_sleeping_in(rank, args.time);
502   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503   TRACE_smpi_sleeping_out(rank);
504 }
505
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
507 {
508   smpi_trace_set_call_location(args.filename.c_str(), args.line);
509 }
510
511 void TestAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
514   req_storage.remove(request);
515   // if request is null here, this may mean that a previous test has succeeded
516   // Different times in traced application and replayed version may lead to this
517   // In this case, ignore the extra calls.
518   if (request != MPI_REQUEST_NULL) {
519     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
520
521     MPI_Status status;
522     int flag = 0;
523     Request::test(&request, &status, &flag);
524
525     XBT_DEBUG("MPI_Test result: %d", flag);
526     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
527      * nullptr.*/
528     if (request == MPI_REQUEST_NULL)
529       req_storage.addNullRequest(args.src, args.dst, args.tag);
530     else
531       req_storage.add(request);
532
533     TRACE_smpi_comm_out(my_proc_id);
534   }
535 }
536
537 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
538 {
539   CHECK_ACTION_PARAMS(action, 0, 1)
540     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
541     : MPI_BYTE;  // default TAU datatype
542
543   /* start a simulated timer */
544   smpi_process()->simulated_start();
545 }
546
547 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
548 {
549   /* nothing to do */
550 }
551
552 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
553 {
554   const unsigned int count_requests = req_storage.size();
555
556   if (count_requests > 0) {
557     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
558     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
559     std::vector<MPI_Request> reqs;
560     req_storage.get_requests(reqs);
561     for (const auto& req : reqs) {
562       if (req && (req->flags() & MPI_REQ_RECV)) {
563         sender_receiver.push_back({req->src(), req->dst()});
564       }
565     }
566     MPI_Status status[count_requests];
567     Request::waitall(count_requests, &(reqs.data())[0], status);
568     req_storage.get_store().clear();
569
570     for (auto& pair : sender_receiver) {
571       TRACE_smpi_recv(pair.first, pair.second, 0);
572     }
573     TRACE_smpi_comm_out(my_proc_id);
574   }
575 }
576
577 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
578 {
579   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
580   Colls::barrier(MPI_COMM_WORLD);
581   TRACE_smpi_comm_out(my_proc_id);
582 }
583
584 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
585 {
586   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
587       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
588         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
589
590   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
591
592   TRACE_smpi_comm_out(my_proc_id);
593 }
594
595 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
596 {
597   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
598       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
599         args.comp_size, args.comm_size, -1,
600         Datatype::encode(args.datatype1), ""));
601
602   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
603       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
604   private_execute_flops(args.comp_size);
605
606   TRACE_smpi_comm_out(my_proc_id);
607 }
608
609 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
610 {
611   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
612         Datatype::encode(args.datatype1), ""));
613
614   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
615       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
616   private_execute_flops(args.comp_size);
617
618   TRACE_smpi_comm_out(my_proc_id);
619 }
620
621 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
622 {
623   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
624       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
625         Datatype::encode(args.datatype1),
626         Datatype::encode(args.datatype2)));
627
628   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
629       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
630       args.recv_size, args.datatype2, MPI_COMM_WORLD);
631
632   TRACE_smpi_comm_out(my_proc_id);
633 }
634
635 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
636 {
637   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
638         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
639
640   if (name == "gather") {
641     int rank = MPI_COMM_WORLD->rank();
642     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
643         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
644   }
645   else
646     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
647         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
648
649   TRACE_smpi_comm_out(my_proc_id);
650 }
651
652 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
653 {
654   int rank = MPI_COMM_WORLD->rank();
655
656   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
657         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
658         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
659
660   if (name == "gatherv") {
661     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
663         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
664   }
665   else {
666     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
667         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
668         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
669   }
670
671   TRACE_smpi_comm_out(my_proc_id);
672 }
673
674 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
675 {
676   int rank = MPI_COMM_WORLD->rank();
677   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
678         Datatype::encode(args.datatype1),
679         Datatype::encode(args.datatype2)));
680
681   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
682       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
683
684   TRACE_smpi_comm_out(my_proc_id);
685 }
686
687 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
688 {
689   int rank = MPI_COMM_WORLD->rank();
690   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
691         nullptr, Datatype::encode(args.datatype1),
692         Datatype::encode(args.datatype2)));
693
694   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
695       args.sendcounts->data(), args.disps.data(), args.datatype1,
696       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
697       MPI_COMM_WORLD);
698
699   TRACE_smpi_comm_out(my_proc_id);
700 }
701
702 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
703 {
704   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
705       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
706         std::to_string(args.comp_size), /* ugly hack to print comp_size */
707         Datatype::encode(args.datatype1)));
708
709   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
710       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
711       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
712
713   private_execute_flops(args.comp_size);
714   TRACE_smpi_comm_out(my_proc_id);
715 }
716
717 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
718 {
719   TRACE_smpi_comm_in(my_proc_id, __func__,
720       new simgrid::instr::VarCollTIData(
721         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
722         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
723
724   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
725       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
726
727   TRACE_smpi_comm_out(my_proc_id);
728 }
729 } // Replay Namespace
730 }} // namespace simgrid::smpi
731
732 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
733 /** @brief Only initialize the replay, don't do it for real */
734 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
735 {
736   xbt_assert(not smpi_process()->initializing());
737
738   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
739   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
740   simgrid::smpi::ActorExt::init();
741
742   smpi_process()->mark_as_initialized();
743   smpi_process()->set_replaying(true);
744
745   int my_proc_id = simgrid::s4u::this_actor::get_pid();
746
747   TRACE_smpi_init(my_proc_id);
748   TRACE_smpi_computing_init(my_proc_id);
749   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
750   TRACE_smpi_comm_out(my_proc_id);
751   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
752   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
753   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
754   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
755   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
756   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
757   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
758   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
759   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
760   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
761   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
762   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
763   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
764   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
765   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
766   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
767   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
768   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
769   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
770   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
771   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
772   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
773   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
774   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
775   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
776   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
777   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
778   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
779
780   //if we have a delayed start, sleep here.
781   if (start_delay_flops > 0) {
782     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
783     private_execute_flops(start_delay_flops);
784   } else {
785     // Wait for the other actors to initialize also
786     simgrid::s4u::this_actor::yield();
787   }
788 }
789
790 /** @brief actually run the replay after initialization */
791 void smpi_replay_main(int rank, const char* trace_filename)
792 {
793   static int active_processes = 0;
794   active_processes++;
795   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
796   std::string rank_string                      = std::to_string(rank);
797   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
798
799   /* and now, finalize everything */
800   /* One active process will stop. Decrease the counter*/
801   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
802   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
803   if (count_requests > 0) {
804     MPI_Request requests[count_requests];
805     MPI_Status status[count_requests];
806     unsigned int i=0;
807
808     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
809       requests[i] = pair.second;
810       i++;
811     }
812     simgrid::smpi::Request::waitall(count_requests, requests, status);
813   }
814   active_processes--;
815
816   if(active_processes==0){
817     /* Last process alive speaking: end the simulated timer */
818     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
819     smpi_free_replay_tmp_buffers();
820   }
821
822   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
823                      new simgrid::instr::NoOpTIData("finalize"));
824
825   smpi_process()->finalize();
826
827   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
828 }
829
830 /** @brief chain a replay initialization and a replay start */
831 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
832 {
833   smpi_replay_init(instance_id, rank, start_delay_flops);
834   smpi_replay_main(rank, trace_filename);
835 }