Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[lgtm] Ensure that the type casting is done before multiplication to prevent overflow.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <cmath>
16 #include <limits>
17 #include <memory>
18 #include <numeric>
19 #include <tuple>
20 #include <unordered_map>
21 #include <vector>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
30 public:
31   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 };
33
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 {
36   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 }
38
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 public:
42   static void apply(size_t& seed, Tuple const& tuple)
43   {
44     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45     hash_combine(seed, std::get<Index>(tuple));
46   }
47 };
48
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 public:
51   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 };
53
54 template <typename... TT> class hash<std::tuple<TT...>> {
55 public:
56   size_t operator()(std::tuple<TT...> const& tt) const
57   {
58     size_t seed = 0;
59     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
60     return seed;
61   }
62 };
63 }
64
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
66 {
67   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68     std::string s = boost::algorithm::join(action, " ");
69     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
70   }
71 }
72
73 /* Helper function */
74 static double parse_double(const std::string& string)
75 {
76   return xbt_str_parse_double(string.c_str(), "not a double");
77 }
78
79 namespace simgrid {
80 namespace smpi {
81
82 namespace replay {
83 MPI_Datatype MPI_DEFAULT_TYPE;
84
85 class RequestStorage {
86 private:
87   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
88   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
89
90   req_storage_t store;
91
92 public:
93   RequestStorage() = default;
94   size_t size() const { return store.size(); }
95
96   req_storage_t& get_store() { return store; }
97
98   void get_requests(std::vector<MPI_Request>& vec) const
99   {
100     for (auto const& pair : store) {
101       auto& req       = pair.second;
102       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
103       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
104         vec.push_back(pair.second);
105         pair.second->print_request("MM");
106       }
107     }
108   }
109
110     MPI_Request find(int src, int dst, int tag)
111     {
112       auto it = store.find(req_key_t(src, dst, tag));
113       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
114     }
115
116     void remove(const Request* req)
117     {
118       if (req == MPI_REQUEST_NULL) return;
119
120       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
121     }
122
123     void add(MPI_Request req)
124     {
125       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
126         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
127     }
128
129     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
130     void addNullRequest(int src, int dst, int tag)
131     {
132       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
133                     MPI_REQUEST_NULL});
134     }
135 };
136
137 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
138 {
139   CHECK_ACTION_PARAMS(action, 3, 0)
140   src = std::stoi(action[2]);
141   dst = std::stoi(action[3]);
142   tag = std::stoi(action[4]);
143 }
144
145 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 {
147   CHECK_ACTION_PARAMS(action, 3, 1)
148   partner = std::stoi(action[2]);
149   tag     = std::stoi(action[3]);
150   size    = parse_double(action[4]);
151   if (action.size() > 5)
152     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
153 }
154
155 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
156 {
157   CHECK_ACTION_PARAMS(action, 1, 0)
158   flops = parse_double(action[2]);
159 }
160
161 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
162 {
163   CHECK_ACTION_PARAMS(action, 1, 0)
164   time = parse_double(action[2]);
165 }
166
167 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 {
169   CHECK_ACTION_PARAMS(action, 2, 0)
170   filename = std::string(action[2]);
171   line = std::stoi(action[3]);
172 }
173
174 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 2)
177   size = parse_double(action[2]);
178   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
179   if (action.size() > 4)
180     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
181 }
182
183 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 {
185   CHECK_ACTION_PARAMS(action, 2, 2)
186   double arg2 = trunc(parse_double(action[2]));
187   xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
188   comm_size = static_cast<unsigned>(arg2);
189   comp_size = parse_double(action[3]);
190   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
191   if (action.size() > 5)
192     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
193 }
194
195 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
196 {
197   CHECK_ACTION_PARAMS(action, 2, 1)
198   double arg2 = trunc(parse_double(action[2]));
199   xbt_assert(0.0 <= arg2 && arg2 <= static_cast<double>(std::numeric_limits<unsigned>::max()));
200   comm_size = static_cast<unsigned>(arg2);
201   comp_size = parse_double(action[3]);
202   if (action.size() > 4)
203     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204 }
205
206 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
207 {
208   CHECK_ACTION_PARAMS(action, 2, 1)
209   comm_size = MPI_COMM_WORLD->size();
210   send_size = parse_double(action[2]);
211   recv_size = parse_double(action[3]);
212
213   if (action.size() > 4)
214     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
215   if (action.size() > 5)
216     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
217 }
218
219 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
220 {
221   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
222         0 gather 68 68 0 0 0
223       where:
224         1) 68 is the sendcounts
225         2) 68 is the recvcounts
226         3) 0 is the root node
227         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
228         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
229   */
230   CHECK_ACTION_PARAMS(action, 2, 3)
231   comm_size = MPI_COMM_WORLD->size();
232   send_size = parse_double(action[2]);
233   recv_size = parse_double(action[3]);
234
235   if (name == "gather") {
236     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
237     if (action.size() > 5)
238       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
239     if (action.size() > 6)
240       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
241   } else {
242     if (action.size() > 4)
243       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
244     if (action.size() > 5)
245       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
246   }
247 }
248
249 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
250 {
251   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
252        0 gather 68 68 10 10 10 0 0 0
253      where:
254        1) 68 is the sendcount
255        2) 68 10 10 10 is the recvcounts
256        3) 0 is the root node
257        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
258        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
259   */
260   comm_size = MPI_COMM_WORLD->size();
261   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
262   send_size  = parse_double(action[2]);
263   disps      = std::vector<int>(comm_size, 0);
264   recvcounts = std::make_shared<std::vector<int>>(comm_size);
265
266   if (name == "gatherv") {
267     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
268     if (action.size() > 4 + comm_size)
269       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
270     if (action.size() > 5 + comm_size)
271       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
272   } else {
273     int disp_index     = 0;
274     /* The 3 comes from "0 gather <sendcount>", which must always be present.
275      * The + comm_size is the recvcounts array, which must also be present
276      */
277     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
278       int datatype_index = 3 + comm_size;
279       disp_index     = datatype_index + 1;
280       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
281       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
282     } else if (action.size() >
283                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
284       disp_index = 3 + comm_size;
285     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
286       int datatype_index = 3 + comm_size;
287       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
288       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
289     }
290
291     if (disp_index != 0) {
292       for (unsigned int i = 0; i < comm_size; i++)
293         disps[i]          = std::stoi(action[disp_index + i]);
294     }
295   }
296
297   for (unsigned int i = 0; i < comm_size; i++) {
298     (*recvcounts)[i] = std::stoi(action[i + 3]);
299   }
300   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
301 }
302
303 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
304 {
305   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
306         0 gather 68 68 0 0 0
307       where:
308         1) 68 is the sendcounts
309         2) 68 is the recvcounts
310         3) 0 is the root node
311         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
312         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
313   */
314   CHECK_ACTION_PARAMS(action, 2, 3)
315   comm_size = MPI_COMM_WORLD->size();
316   send_size = parse_double(action[2]);
317   recv_size = parse_double(action[3]);
318   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
319   if (action.size() > 5)
320     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
321   if (action.size() > 6)
322     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
323 }
324
325 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
326 {
327   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
328      0 gather 68 10 10 10 68 0 0 0
329       where:
330       1) 68 10 10 10 is the sendcounts
331       2) 68 is the recvcount
332       3) 0 is the root node
333       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
334       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
335   */
336   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
337   recv_size  = parse_double(action[2 + comm_size]);
338   disps      = std::vector<int>(comm_size, 0);
339   sendcounts = std::make_shared<std::vector<int>>(comm_size);
340
341   if (action.size() > 5 + comm_size)
342     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
343   if (action.size() > 5 + comm_size)
344     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
345
346   for (unsigned int i = 0; i < comm_size; i++) {
347     (*sendcounts)[i] = std::stoi(action[i + 2]);
348   }
349   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
350   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
351 }
352
353 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
354 {
355   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
356        0 reducescatter 275427 275427 275427 204020 11346849 0
357      where:
358        1) The first four values after the name of the action declare the recvcounts array
359        2) The value 11346849 is the amount of instructions
360        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
361   */
362   comm_size = MPI_COMM_WORLD->size();
363   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
364   comp_size  = parse_double(action[2 + comm_size]);
365   recvcounts = std::make_shared<std::vector<int>>(comm_size);
366   if (action.size() > 3 + comm_size)
367     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
368
369   for (unsigned int i = 0; i < comm_size; i++) {
370     recvcounts->push_back(std::stoi(action[i + 2]));
371   }
372   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
373 }
374
375 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
376 {
377   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
378         0 alltoallv 100 1 7 10 12 100 1 70 10 5
379      where:
380       1) 100 is the size of the send buffer *sizeof(int),
381       2) 1 7 10 12 is the sendcounts array
382       3) 100*sizeof(int) is the size of the receiver buffer
383       4)  1 70 10 5 is the recvcounts array
384   */
385   comm_size = MPI_COMM_WORLD->size();
386   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
387   sendcounts = std::make_shared<std::vector<int>>(comm_size);
388   recvcounts = std::make_shared<std::vector<int>>(comm_size);
389   senddisps  = std::vector<int>(comm_size, 0);
390   recvdisps  = std::vector<int>(comm_size, 0);
391
392   if (action.size() > 5 + 2 * comm_size)
393     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
394   if (action.size() > 5 + 2 * comm_size)
395     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
396
397   send_buf_size = parse_double(action[2]);
398   recv_buf_size = parse_double(action[3 + comm_size]);
399   for (unsigned int i = 0; i < comm_size; i++) {
400     (*sendcounts)[i] = std::stoi(action[3 + i]);
401     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
402   }
403   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
405 }
406
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
408 {
409   std::string s = boost::algorithm::join(action, " ");
410   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411   const WaitTestParser& args = get_args();
412   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413   req_storage.remove(request);
414
415   if (request == MPI_REQUEST_NULL) {
416     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
417      * return.*/
418     return;
419   }
420
421   aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
422
423   // Must be taken before Request::wait() since the request may be set to
424   // MPI_REQUEST_NULL by Request::wait!
425   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
426   // TODO: Here we take the rank while we normally take the process id (look for get_pid())
427   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
428
429   MPI_Status status;
430   Request::wait(&request, &status);
431
432   TRACE_smpi_comm_out(rank);
433   if (is_wait_for_receive)
434     TRACE_smpi_recv(args.src, args.dst, args.tag);
435 }
436
437 void SendAction::kernel(simgrid::xbt::ReplayAction&)
438 {
439   const SendRecvParser& args = get_args();
440   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
441
442   TRACE_smpi_comm_in(
443       get_pid(), __func__,
444       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
445   if (not TRACE_smpi_view_internals())
446     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
447
448   if (get_name() == "send") {
449     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450   } else if (get_name() == "isend") {
451     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
452     req_storage.add(request);
453   } else {
454     xbt_die("Don't know this action, %s", get_name().c_str());
455   }
456
457   TRACE_smpi_comm_out(get_pid());
458 }
459
460 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
461 {
462   const SendRecvParser& args = get_args();
463   TRACE_smpi_comm_in(
464       get_pid(), __func__,
465       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
466
467   MPI_Status status;
468   // unknown size from the receiver point of view
469   double arg_size = args.size;
470   if (arg_size <= 0.0) {
471     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
472     arg_size = status.count;
473   }
474
475   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
476   if (get_name() == "recv") {
477     is_recv = true;
478     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
479   } else if (get_name() == "irecv") {
480     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
481     req_storage.add(request);
482   } else {
483     THROW_IMPOSSIBLE;
484   }
485
486   TRACE_smpi_comm_out(get_pid());
487   if (is_recv && not TRACE_smpi_view_internals()) {
488     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
489     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
490   }
491 }
492
493 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
494 {
495   const ComputeParser& args = get_args();
496   if (smpi_cfg_simulate_computation()) {
497     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
498   }
499 }
500
501 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
502 {
503   const SleepParser& args = get_args();
504   XBT_DEBUG("Sleep for: %lf secs", args.time);
505   aid_t pid = simgrid::s4u::this_actor::get_pid();
506   TRACE_smpi_sleeping_in(pid, args.time);
507   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
508   TRACE_smpi_sleeping_out(pid);
509 }
510
511 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   const LocationParser& args = get_args();
514   smpi_trace_set_call_location(args.filename.c_str(), args.line);
515 }
516
517 void TestAction::kernel(simgrid::xbt::ReplayAction&)
518 {
519   const WaitTestParser& args = get_args();
520   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
521   req_storage.remove(request);
522   // if request is null here, this may mean that a previous test has succeeded
523   // Different times in traced application and replayed version may lead to this
524   // In this case, ignore the extra calls.
525   if (request != MPI_REQUEST_NULL) {
526     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
527
528     MPI_Status status;
529     int flag = 0;
530     Request::test(&request, &status, &flag);
531
532     XBT_DEBUG("MPI_Test result: %d", flag);
533     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
534      * nullptr.*/
535     if (request == MPI_REQUEST_NULL)
536       req_storage.addNullRequest(args.src, args.dst, args.tag);
537     else
538       req_storage.add(request);
539
540     TRACE_smpi_comm_out(get_pid());
541   }
542 }
543
544 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
545 {
546   CHECK_ACTION_PARAMS(action, 0, 1)
547     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
548     : MPI_BYTE;  // default TAU datatype
549
550   /* start a simulated timer */
551   smpi_process()->simulated_start();
552 }
553
554 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
555 {
556   /* nothing to do */
557 }
558
559 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
560 {
561   const size_t count_requests = req_storage.size();
562
563   if (count_requests > 0) {
564     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
565     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
566     std::vector<MPI_Request> reqs;
567     req_storage.get_requests(reqs);
568     for (auto const& req : reqs) {
569       if (req && (req->flags() & MPI_REQ_RECV)) {
570         sender_receiver.emplace_back(req->src(), req->dst());
571       }
572     }
573     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
574     req_storage.get_store().clear();
575
576     for (auto const& pair : sender_receiver) {
577       TRACE_smpi_recv(pair.first, pair.second, 0);
578     }
579     TRACE_smpi_comm_out(get_pid());
580   }
581 }
582
583 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
584 {
585   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
586   colls::barrier(MPI_COMM_WORLD);
587   TRACE_smpi_comm_out(get_pid());
588 }
589
590 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
591 {
592   const BcastArgParser& args = get_args();
593   TRACE_smpi_comm_in(get_pid(), "action_bcast",
594                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
595                                                     0, Datatype::encode(args.datatype1), ""));
596
597   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
598
599   TRACE_smpi_comm_out(get_pid());
600 }
601
602 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
603 {
604   const ReduceArgParser& args = get_args();
605   TRACE_smpi_comm_in(get_pid(), "action_reduce",
606                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
607                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
608
609   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
610                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
611                 args.root, MPI_COMM_WORLD);
612   private_execute_flops(args.comp_size);
613
614   TRACE_smpi_comm_out(get_pid());
615 }
616
617 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
618 {
619   const AllReduceArgParser& args = get_args();
620   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
621                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
622                                                     Datatype::encode(args.datatype1), ""));
623
624   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
625                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
626                    MPI_COMM_WORLD);
627   private_execute_flops(args.comp_size);
628
629   TRACE_smpi_comm_out(get_pid());
630 }
631
632 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
633 {
634   const AllToAllArgParser& args = get_args();
635   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
636                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
637                                                     Datatype::encode(args.datatype1),
638                                                     Datatype::encode(args.datatype2)));
639
640   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
641                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
642                   MPI_COMM_WORLD);
643
644   TRACE_smpi_comm_out(get_pid());
645 }
646
647 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
648 {
649   const GatherArgParser& args = get_args();
650   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
651                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
652                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
653                                                     Datatype::encode(args.datatype2)));
654
655   if (get_name() == "gather") {
656     int rank = MPI_COMM_WORLD->rank();
657     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
658                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
659                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
660   } else
661     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
663                      MPI_COMM_WORLD);
664
665   TRACE_smpi_comm_out(get_pid());
666 }
667
668 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
669 {
670   int rank = MPI_COMM_WORLD->rank();
671   const GatherVArgParser& args = get_args();
672   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673                      new simgrid::instr::VarCollTIData(
674                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
675                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
676
677   if (get_name() == "gatherv") {
678     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
680                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
681   } else {
682     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
684                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
685   }
686
687   TRACE_smpi_comm_out(get_pid());
688 }
689
690 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
691 {
692   int rank = MPI_COMM_WORLD->rank();
693   const ScatterArgParser& args = get_args();
694   TRACE_smpi_comm_in(get_pid(), "action_scatter",
695                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
696                                                     Datatype::encode(args.datatype1),
697                                                     Datatype::encode(args.datatype2)));
698
699   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
701                  args.datatype2, args.root, MPI_COMM_WORLD);
702
703   TRACE_smpi_comm_out(get_pid());
704 }
705
706 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
707 {
708   int rank = MPI_COMM_WORLD->rank();
709   const ScatterVArgParser& args = get_args();
710   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
711                      new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
712                                                        nullptr, Datatype::encode(args.datatype1),
713                                                        Datatype::encode(args.datatype2)));
714
715   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
716                   args.sendcounts->data(), args.disps.data(), args.datatype1,
717                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
718                   MPI_COMM_WORLD);
719
720   TRACE_smpi_comm_out(get_pid());
721 }
722
723 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
724 {
725   const ReduceScatterArgParser& args = get_args();
726   TRACE_smpi_comm_in(
727       get_pid(), "action_reducescatter",
728       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
729                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
730                                         Datatype::encode(args.datatype1)));
731
732   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
733                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
734                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
735
736   private_execute_flops(args.comp_size);
737   TRACE_smpi_comm_out(get_pid());
738 }
739
740 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
741 {
742   const AllToAllVArgParser& args = get_args();
743   TRACE_smpi_comm_in(get_pid(), __func__,
744                      new simgrid::instr::VarCollTIData(
745                          "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
746                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
747
748   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
749                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
750                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
751
752   TRACE_smpi_comm_out(get_pid());
753 }
754 } // Replay Namespace
755 }} // namespace simgrid::smpi
756
757 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
758 /** @brief Only initialize the replay, don't do it for real */
759 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
760 {
761   xbt_assert(not smpi_process()->initializing());
762
763   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
764   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
765   simgrid::smpi::ActorExt::init();
766
767   smpi_process()->mark_as_initialized();
768   smpi_process()->set_replaying(true);
769
770   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
771   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
799
800   //if we have a delayed start, sleep here.
801   if (start_delay_flops > 0) {
802     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803     private_execute_flops(start_delay_flops);
804   } else {
805     // Wait for the other actors to initialize also
806     simgrid::s4u::this_actor::yield();
807   }
808 }
809
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* private_trace_filename)
812 {
813   static int active_processes = 0;
814   active_processes++;
815   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816   std::string rank_string                      = std::to_string(rank);
817   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
818
819   /* and now, finalize everything */
820   /* One active process will stop. Decrease the counter*/
821   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
823   if (count_requests > 0) {
824     std::vector<MPI_Request> requests(count_requests);
825     unsigned int i=0;
826
827     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828       requests[i] = pair.second;
829       i++;
830     }
831     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
832   }
833   active_processes--;
834
835   if(active_processes==0){
836     /* Last process alive speaking: end the simulated timer */
837     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
838     smpi_free_replay_tmp_buffers();
839   }
840
841   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
842                      new simgrid::instr::NoOpTIData("finalize"));
843
844   smpi_process()->finalize();
845
846   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
847 }
848
849 /** @brief chain a replay initialization and a replay start */
850 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
851 {
852   smpi_replay_init(instance_id, rank, start_delay_flops);
853   smpi_replay_main(rank, private_trace_filename);
854 }