Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
previous ti-tracing improvement for bcast and reduce was not a good idea.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <cmath>
16 #include <limits>
17 #include <memory>
18 #include <numeric>
19 #include <tuple>
20 #include <unordered_map>
21 #include <vector>
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24
25 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
26 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
27 // this could go into a header file.
28 namespace hash_tuple {
29 template <typename TT> class hash {
30 public:
31   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
32 };
33
34 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 {
36   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
37 }
38
39 // Recursive template code derived from Matthieu M.
40 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 public:
42   static void apply(size_t& seed, Tuple const& tuple)
43   {
44     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
45     hash_combine(seed, std::get<Index>(tuple));
46   }
47 };
48
49 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 public:
51   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
52 };
53
54 template <typename... TT> class hash<std::tuple<TT...>> {
55 public:
56   size_t operator()(std::tuple<TT...> const& tt) const
57   {
58     size_t seed = 0;
59     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
60     return seed;
61   }
62 };
63 }
64
65 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
66 {
67   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
68     std::string s = boost::algorithm::join(action, " ");
69     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
70   }
71 }
72
73 /* Helper functions */
74 static double parse_double(const std::string& string)
75 {
76   return xbt_str_parse_double(string.c_str(), "not a double");
77 }
78
79 template <typename T> static T parse_integer(const std::string& string)
80 {
81   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
82   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
83                  val <= static_cast<double>(std::numeric_limits<T>::max()),
84              "out of range: %g", val);
85   return static_cast<T>(val);
86 }
87
88 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
89 {
90   return i < action.size() ? std::stoi(action[i]) : 0;
91 }
92
93 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
94 {
95   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
96 }
97
98 namespace simgrid {
99 namespace smpi {
100
101 namespace replay {
102 MPI_Datatype MPI_DEFAULT_TYPE;
103
104 class RequestStorage {
105 private:
106   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
107   using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
108
109   req_storage_t store;
110
111 public:
112   RequestStorage() = default;
113   size_t size() const { return store.size(); }
114
115   req_storage_t& get_store() { return store; }
116
117   void get_requests(std::vector<MPI_Request>& vec) const
118   {
119     for (auto const& pair : store) {
120       auto& req       = pair.second;
121       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
122       if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
123         vec.push_back(pair.second);
124         pair.second->print_request("MM");
125       }
126     }
127   }
128
129     MPI_Request find(int src, int dst, int tag)
130     {
131       auto it = store.find(req_key_t(src, dst, tag));
132       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
133     }
134
135     void remove(const Request* req)
136     {
137       if (req == MPI_REQUEST_NULL) return;
138
139       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
140     }
141
142     void add(MPI_Request req)
143     {
144       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
145         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
146     }
147
148     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149     void addNullRequest(int src, int dst, int tag)
150     {
151       store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
152                     MPI_REQUEST_NULL});
153     }
154 };
155
156 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
157 {
158   CHECK_ACTION_PARAMS(action, 3, 0)
159   src = std::stoi(action[2]);
160   dst = std::stoi(action[3]);
161   tag = std::stoi(action[4]);
162 }
163
164 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
165 {
166   CHECK_ACTION_PARAMS(action, 3, 1)
167   partner = std::stoi(action[2]);
168   tag     = std::stoi(action[3]);
169   size      = parse_integer<size_t>(action[4]);
170   datatype1 = parse_datatype(action, 5);
171 }
172
173 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
174 {
175   CHECK_ACTION_PARAMS(action, 1, 0)
176   flops = parse_double(action[2]);
177 }
178
179 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
180 {
181   CHECK_ACTION_PARAMS(action, 1, 0)
182   time = parse_double(action[2]);
183 }
184
185 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
186 {
187   CHECK_ACTION_PARAMS(action, 2, 0)
188   filename = std::string(action[2]);
189   line = std::stoi(action[3]);
190 }
191
192 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
193 {
194   CHECK_ACTION_PARAMS(action, 1, 2)
195   size      = parse_integer<size_t>(action[2]);
196   root      = parse_root(action, 3);
197   datatype1 = parse_datatype(action, 4);
198 }
199
200 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 2)
203   comm_size = parse_integer<unsigned>(action[2]);
204   comp_size = parse_double(action[3]);
205   root      = parse_root(action, 4);
206   datatype1 = parse_datatype(action, 5);
207 }
208
209 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   comm_size = parse_integer<unsigned>(action[2]);
213   comp_size = parse_double(action[3]);
214   datatype1 = parse_datatype(action, 4);
215 }
216
217 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 1)
220   comm_size = MPI_COMM_WORLD->size();
221   send_size = parse_integer<int>(action[2]);
222   recv_size = parse_integer<int>(action[3]);
223   datatype1 = parse_datatype(action, 4);
224   datatype2 = parse_datatype(action, 5);
225 }
226
227 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
228 {
229   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
230         0 gather 68 68 0 0 0
231       where:
232         1) 68 is the sendcounts
233         2) 68 is the recvcounts
234         3) 0 is the root node
235         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
236         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
237   */
238   CHECK_ACTION_PARAMS(action, 2, 3)
239   comm_size = MPI_COMM_WORLD->size();
240   send_size = parse_integer<int>(action[2]);
241   recv_size = parse_integer<int>(action[3]);
242
243   if (name == "gather") {
244     root      = parse_root(action, 4);
245     datatype1 = parse_datatype(action, 5);
246     datatype2 = parse_datatype(action, 6);
247   } else {
248     root      = 0;
249     datatype1 = parse_datatype(action, 4);
250     datatype2 = parse_datatype(action, 5);
251   }
252 }
253
254 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
255 {
256   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
257        0 gather 68 68 10 10 10 0 0 0
258      where:
259        1) 68 is the sendcount
260        2) 68 10 10 10 is the recvcounts
261        3) 0 is the root node
262        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
263        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
264   */
265   comm_size = MPI_COMM_WORLD->size();
266   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
267   send_size  = parse_integer<int>(action[2]);
268   disps      = std::vector<int>(comm_size, 0);
269   recvcounts = std::make_shared<std::vector<int>>(comm_size);
270
271   if (name == "gatherv") {
272     root      = parse_root(action, 3 + comm_size);
273     datatype1 = parse_datatype(action, 4 + comm_size);
274     datatype2 = parse_datatype(action, 5 + comm_size);
275   } else {
276     root                = 0;
277     unsigned disp_index = 0;
278     /* The 3 comes from "0 gather <sendcount>", which must always be present.
279      * The + comm_size is the recvcounts array, which must also be present
280      */
281     if (action.size() > 3 + comm_size + comm_size) {
282       // datatype + disp are specified
283       datatype1  = parse_datatype(action, 3 + comm_size);
284       datatype2  = parse_datatype(action, 4 + comm_size);
285       disp_index = 5 + comm_size;
286     } else if (action.size() > 3 + comm_size + 2) {
287       // disps specified; datatype is not specified; use the default one
288       datatype1  = MPI_DEFAULT_TYPE;
289       datatype2  = MPI_DEFAULT_TYPE;
290       disp_index = 3 + comm_size;
291     } else {
292       // no disp specified, maybe only datatype,
293       datatype1 = parse_datatype(action, 3 + comm_size);
294       datatype2 = parse_datatype(action, 4 + comm_size);
295     }
296
297     if (disp_index != 0) {
298       xbt_assert(disp_index + comm_size <= action.size());
299       for (unsigned i = 0; i < comm_size; i++)
300         disps[i]          = std::stoi(action[disp_index + i]);
301     }
302   }
303
304   for (unsigned int i = 0; i < comm_size; i++) {
305     (*recvcounts)[i] = std::stoi(action[i + 3]);
306   }
307   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
308 }
309
310 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
311 {
312   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
313         0 gather 68 68 0 0 0
314       where:
315         1) 68 is the sendcounts
316         2) 68 is the recvcounts
317         3) 0 is the root node
318         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
319         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
320   */
321   CHECK_ACTION_PARAMS(action, 2, 3)
322   comm_size = MPI_COMM_WORLD->size();
323   send_size = parse_integer<int>(action[2]);
324   recv_size = parse_integer<int>(action[3]);
325   root      = parse_root(action, 4);
326   datatype1 = parse_datatype(action, 5);
327   datatype2 = parse_datatype(action, 6);
328 }
329
330 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
331 {
332   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
333      0 gather 68 10 10 10 68 0 0 0
334       where:
335       1) 68 10 10 10 is the sendcounts
336       2) 68 is the recvcount
337       3) 0 is the root node
338       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
339       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
340   */
341   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
342   recv_size  = parse_integer<int>(action[2 + comm_size]);
343   disps      = std::vector<int>(comm_size, 0);
344   sendcounts = std::make_shared<std::vector<int>>(comm_size);
345
346   root      = parse_root(action, 3 + comm_size);
347   datatype1 = parse_datatype(action, 4 + comm_size);
348   datatype2 = parse_datatype(action, 5 + comm_size);
349
350   for (unsigned int i = 0; i < comm_size; i++) {
351     (*sendcounts)[i] = std::stoi(action[i + 2]);
352   }
353   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
354 }
355
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
357 {
358   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359        0 reducescatter 275427 275427 275427 204020 11346849 0
360      where:
361        1) The first four values after the name of the action declare the recvcounts array
362        2) The value 11346849 is the amount of instructions
363        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
364   */
365   comm_size = MPI_COMM_WORLD->size();
366   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367   comp_size  = parse_double(action[2 + comm_size]);
368   recvcounts = std::make_shared<std::vector<int>>(comm_size);
369   datatype1  = parse_datatype(action, 3 + comm_size);
370
371   for (unsigned int i = 0; i < comm_size; i++) {
372     recvcounts->push_back(std::stoi(action[i + 2]));
373   }
374   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
375 }
376
377 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
378 {
379   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
380         0 alltoallv 100 1 7 10 12 100 1 70 10 5
381      where:
382       1) 100 is the size of the send buffer *sizeof(int),
383       2) 1 7 10 12 is the sendcounts array
384       3) 100*sizeof(int) is the size of the receiver buffer
385       4)  1 70 10 5 is the recvcounts array
386   */
387   comm_size = MPI_COMM_WORLD->size();
388   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
389   sendcounts = std::make_shared<std::vector<int>>(comm_size);
390   recvcounts = std::make_shared<std::vector<int>>(comm_size);
391   senddisps  = std::vector<int>(comm_size, 0);
392   recvdisps  = std::vector<int>(comm_size, 0);
393
394   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
395   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
396
397   send_buf_size = parse_integer<int>(action[2]);
398   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
399   for (unsigned int i = 0; i < comm_size; i++) {
400     (*sendcounts)[i] = std::stoi(action[3 + i]);
401     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
402   }
403   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
404   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
405 }
406
407 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
408 {
409   std::string s = boost::algorithm::join(action, " ");
410   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
411   const WaitTestParser& args = get_args();
412   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413   req_storage.remove(request);
414
415   if (request == MPI_REQUEST_NULL) {
416     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
417      * return.*/
418     return;
419   }
420
421   // Must be taken before Request::wait() since the request may be set to
422   // MPI_REQUEST_NULL by Request::wait!
423   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
424
425   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
426
427   MPI_Status status;
428   Request::wait(&request, &status);
429
430   TRACE_smpi_comm_out(get_pid());
431   if (is_wait_for_receive)
432     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
433 }
434
435 void SendAction::kernel(simgrid::xbt::ReplayAction&)
436 {
437   const SendRecvParser& args = get_args();
438   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
439
440   TRACE_smpi_comm_in(
441       get_pid(), __func__,
442       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
443   if (not TRACE_smpi_view_internals())
444     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
445
446   if (get_name() == "send") {
447     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448   } else if (get_name() == "isend") {
449     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450     req_storage.add(request);
451   } else {
452     xbt_die("Don't know this action, %s", get_name().c_str());
453   }
454
455   TRACE_smpi_comm_out(get_pid());
456 }
457
458 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
459 {
460   const SendRecvParser& args = get_args();
461   TRACE_smpi_comm_in(
462       get_pid(), __func__,
463       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
464
465   MPI_Status status;
466   // unknown size from the receiver point of view
467   size_t arg_size = args.size;
468   if (arg_size == 0) {
469     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
470     arg_size = status.count;
471   }
472
473   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
474   if (get_name() == "recv") {
475     is_recv = true;
476     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
477   } else if (get_name() == "irecv") {
478     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
479     req_storage.add(request);
480   } else {
481     THROW_IMPOSSIBLE;
482   }
483
484   TRACE_smpi_comm_out(get_pid());
485   if (is_recv && not TRACE_smpi_view_internals()) {
486     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
487     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
488   }
489 }
490
491 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
492 {
493   const ComputeParser& args = get_args();
494   if (smpi_cfg_simulate_computation()) {
495     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
496   }
497 }
498
499 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
500 {
501   const SleepParser& args = get_args();
502   XBT_DEBUG("Sleep for: %lf secs", args.time);
503   aid_t pid = simgrid::s4u::this_actor::get_pid();
504   TRACE_smpi_sleeping_in(pid, args.time);
505   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
506   TRACE_smpi_sleeping_out(pid);
507 }
508
509 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
510 {
511   const LocationParser& args = get_args();
512   smpi_trace_set_call_location(args.filename.c_str(), args.line);
513 }
514
515 void TestAction::kernel(simgrid::xbt::ReplayAction&)
516 {
517   const WaitTestParser& args = get_args();
518   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
519   req_storage.remove(request);
520   // if request is null here, this may mean that a previous test has succeeded
521   // Different times in traced application and replayed version may lead to this
522   // In this case, ignore the extra calls.
523   if (request != MPI_REQUEST_NULL) {
524     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
525
526     MPI_Status status;
527     int flag = 0;
528     Request::test(&request, &status, &flag);
529
530     XBT_DEBUG("MPI_Test result: %d", flag);
531     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
532      * nullptr.*/
533     if (request == MPI_REQUEST_NULL)
534       req_storage.addNullRequest(args.src, args.dst, args.tag);
535     else
536       req_storage.add(request);
537
538     TRACE_smpi_comm_out(get_pid());
539   }
540 }
541
542 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
543 {
544   CHECK_ACTION_PARAMS(action, 0, 1)
545     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
546     : MPI_BYTE;  // default TAU datatype
547
548   /* start a simulated timer */
549   smpi_process()->simulated_start();
550 }
551
552 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
553 {
554   /* nothing to do */
555 }
556
557 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
558 {
559   const size_t count_requests = req_storage.size();
560
561   if (count_requests > 0) {
562     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
563     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
564     std::vector<MPI_Request> reqs;
565     req_storage.get_requests(reqs);
566     for (auto const& req : reqs) {
567       if (req && (req->flags() & MPI_REQ_RECV)) {
568         sender_receiver.emplace_back(req->src(), req->dst());
569       }
570     }
571     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
572     req_storage.get_store().clear();
573
574     for (auto const& pair : sender_receiver) {
575       TRACE_smpi_recv(pair.first, pair.second, 0);
576     }
577     TRACE_smpi_comm_out(get_pid());
578   }
579 }
580
581 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
582 {
583   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
584   colls::barrier(MPI_COMM_WORLD);
585   TRACE_smpi_comm_out(get_pid());
586 }
587
588 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
589 {
590   const BcastArgParser& args = get_args();
591   TRACE_smpi_comm_in(get_pid(), "action_bcast",
592                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
593                                                     0, Datatype::encode(args.datatype1), ""));
594
595   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
596
597   TRACE_smpi_comm_out(get_pid());
598 }
599
600 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
601 {
602   const ReduceArgParser& args = get_args();
603   TRACE_smpi_comm_in(get_pid(), "action_reduce",
604                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
605                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
606
607   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
608                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
609                 args.root, MPI_COMM_WORLD);
610   if(args.comp_size != 0.0)
611     private_execute_flops(args.comp_size);
612
613   TRACE_smpi_comm_out(get_pid());
614 }
615
616 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
617 {
618   const AllReduceArgParser& args = get_args();
619   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
620                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
621                                                     Datatype::encode(args.datatype1), ""));
622
623   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
624                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
625                    MPI_COMM_WORLD);
626   if(args.comp_size != 0.0)
627     private_execute_flops(args.comp_size);
628
629   TRACE_smpi_comm_out(get_pid());
630 }
631
632 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
633 {
634   const AllToAllArgParser& args = get_args();
635   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
636                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
637                                                     Datatype::encode(args.datatype1),
638                                                     Datatype::encode(args.datatype2)));
639
640   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
641                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
642                   MPI_COMM_WORLD);
643
644   TRACE_smpi_comm_out(get_pid());
645 }
646
647 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
648 {
649   const GatherArgParser& args = get_args();
650   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
651                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
652                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
653                                                     Datatype::encode(args.datatype2)));
654
655   if (get_name() == "gather") {
656     int rank = MPI_COMM_WORLD->rank();
657     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
658                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
659                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
660   } else
661     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
663                      MPI_COMM_WORLD);
664
665   TRACE_smpi_comm_out(get_pid());
666 }
667
668 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
669 {
670   int rank = MPI_COMM_WORLD->rank();
671   const GatherVArgParser& args = get_args();
672   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
673                      new simgrid::instr::VarCollTIData(
674                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, 0,
675                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
676
677   if (get_name() == "gatherv") {
678     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
679                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
680                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
681   } else {
682     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
684                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
685   }
686
687   TRACE_smpi_comm_out(get_pid());
688 }
689
690 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
691 {
692   int rank = MPI_COMM_WORLD->rank();
693   const ScatterArgParser& args = get_args();
694   TRACE_smpi_comm_in(get_pid(), "action_scatter",
695                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
696                                                     Datatype::encode(args.datatype1),
697                                                     Datatype::encode(args.datatype2)));
698
699   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
701                  args.datatype2, args.root, MPI_COMM_WORLD);
702
703   TRACE_smpi_comm_out(get_pid());
704 }
705
706 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
707 {
708   int rank = MPI_COMM_WORLD->rank();
709   const ScatterVArgParser& args = get_args();
710   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
711                      new simgrid::instr::VarCollTIData(get_name(), args.root, 0, args.sendcounts, args.recv_size,
712                                                        nullptr, Datatype::encode(args.datatype1),
713                                                        Datatype::encode(args.datatype2)));
714
715   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
716                   args.sendcounts->data(), args.disps.data(), args.datatype1,
717                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
718                   MPI_COMM_WORLD);
719
720   TRACE_smpi_comm_out(get_pid());
721 }
722
723 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
724 {
725   const ReduceScatterArgParser& args = get_args();
726   TRACE_smpi_comm_in(
727       get_pid(), "action_reducescatter",
728       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, 0, args.recvcounts,
729                                         std::to_string(args.comp_size), /* ugly hack to print comp_size */
730                                         Datatype::encode(args.datatype1)));
731
732   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
733                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
734                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
735
736   private_execute_flops(args.comp_size);
737   TRACE_smpi_comm_out(get_pid());
738 }
739
740 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
741 {
742   const AllToAllVArgParser& args = get_args();
743   TRACE_smpi_comm_in(get_pid(), __func__,
744                      new simgrid::instr::VarCollTIData(
745                          "alltoallv", 0, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
746                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
747
748   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
749                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
750                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
751
752   TRACE_smpi_comm_out(get_pid());
753 }
754 } // Replay Namespace
755 }} // namespace simgrid::smpi
756
757 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
758 /** @brief Only initialize the replay, don't do it for real */
759 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
760 {
761   xbt_assert(not smpi_process()->initializing());
762
763   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
764   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
765   simgrid::smpi::ActorExt::init();
766
767   smpi_process()->mark_as_initialized();
768   smpi_process()->set_replaying(true);
769
770   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
771   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
772   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
773   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
774   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
775   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
776   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
779   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
780   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
781   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
782   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
783   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
784   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
785   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
786   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
787   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
788   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
789   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
790   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
791   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
792   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
793   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
794   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
795   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
796   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
797   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
798   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
799
800   //if we have a delayed start, sleep here.
801   if (start_delay_flops > 0) {
802     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
803     private_execute_flops(start_delay_flops);
804   } else {
805     // Wait for the other actors to initialize also
806     simgrid::s4u::this_actor::yield();
807   }
808 }
809
810 /** @brief actually run the replay after initialization */
811 void smpi_replay_main(int rank, const char* private_trace_filename)
812 {
813   static int active_processes = 0;
814   active_processes++;
815   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
816   std::string rank_string                      = std::to_string(rank);
817   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
818
819   /* and now, finalize everything */
820   /* One active process will stop. Decrease the counter*/
821   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
822   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
823   if (count_requests > 0) {
824     std::vector<MPI_Request> requests(count_requests);
825     unsigned int i=0;
826
827     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
828       requests[i] = pair.second;
829       i++;
830     }
831     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
832   }
833
834   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
835     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
836
837   active_processes--;
838
839   if(active_processes==0){
840     /* Last process alive speaking: end the simulated timer */
841     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
842     smpi_free_replay_tmp_buffers();
843   }
844
845   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
846                      new simgrid::instr::NoOpTIData("finalize"));
847
848   smpi_process()->finalize();
849
850   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
851 }
852
853 /** @brief chain a replay initialization and a replay start */
854 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
855 {
856   smpi_replay_init(instance_id, rank, start_delay_flops);
857   smpi_replay_main(rank, private_trace_filename);
858 }