Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'adegomme-master-patch-16995' into 'master'
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16 #include "xbt/str.h"
17
18 #include <cmath>
19 #include <limits>
20 #include <memory>
21 #include <numeric>
22 #include <tuple>
23 #include <unordered_map>
24 #include <vector>
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
27 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
28 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
29 // this could go into a header file.
30 namespace hash_tuple {
31 template <typename TT> class hash {
32 public:
33   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
34 };
35
36 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
37 {
38   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
39 }
40
41 // Recursive template code derived from Matthieu M.
42 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
43 public:
44   static void apply(size_t& seed, Tuple const& tuple)
45   {
46     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
47     hash_combine(seed, std::get<Index>(tuple));
48   }
49 };
50
51 template <class Tuple> class HashValueImpl<Tuple, 0> {
52 public:
53   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
54 };
55
56 template <typename... TT> class hash<std::tuple<TT...>> {
57 public:
58   size_t operator()(std::tuple<TT...> const& tt) const
59   {
60     size_t seed = 0;
61     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
62     return seed;
63   }
64 };
65 }
66
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper functions */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "not a double");
79 }
80
81 template <typename T> static T parse_integer(const std::string& string)
82 {
83   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
84   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
85                  val <= static_cast<double>(std::numeric_limits<T>::max()),
86              "out of range: %g", val);
87   return static_cast<T>(val);
88 }
89
90 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
91 {
92   return i < action.size() ? std::stoi(action[i]) : 0;
93 }
94
95 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
96 {
97   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
98 }
99
100 namespace simgrid::smpi::replay {
101 MPI_Datatype MPI_DEFAULT_TYPE;
102
103 class RequestStorage {
104 private:
105   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
106   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
107
108   req_storage_t store;
109
110 public:
111   RequestStorage() = default;
112   size_t size() const { return store.size(); }
113
114   req_storage_t& get_store() { return store; }
115
116   void get_requests(std::vector<MPI_Request>& vec) const
117   {
118     for (auto const& [_, reqs] : store) {
119       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
120       for (auto& req: reqs){
121         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
122           vec.push_back(req);
123           req->print_request("MM");
124         }
125       }
126     }
127   }
128
129   MPI_Request pop(int src, int dst, int tag)
130   {
131     auto it = store.find(req_key_t(src, dst, tag));
132     if (it == store.end())
133      return MPI_REQUEST_NULL;
134     MPI_Request req = it->second.front();
135     it->second.pop_front();
136     if(it->second.empty())
137       store.erase(req_key_t(src, dst, tag));
138     return req;
139   }
140
141   void add(MPI_Request req)
142   {
143     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
144       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
145     }
146   }
147
148   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
149   void addNullRequest(int src, int dst, int tag)
150   {
151     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
152     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
153     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
154   }
155 };
156
157 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
158 {
159   CHECK_ACTION_PARAMS(action, 3, 0)
160   src = std::stoi(action[2]);
161   dst = std::stoi(action[3]);
162   tag = std::stoi(action[4]);
163 }
164
165 void SendOrRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
166 {
167   CHECK_ACTION_PARAMS(action, 3, 1)
168   partner = std::stoi(action[2]);
169   tag     = std::stoi(action[3]);
170   size      = parse_integer<ssize_t>(action[4]);
171   datatype1 = parse_datatype(action, 5);
172 }
173
174 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 1, 0)
177   flops = parse_double(action[2]);
178 }
179
180 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
181 {
182   CHECK_ACTION_PARAMS(action, 1, 0)
183   time = parse_double(action[2]);
184 }
185
186 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 0)
189   filename = std::string(action[2]);
190   line = std::stoi(action[3]);
191 }
192
193 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 {
195   CHECK_ACTION_PARAMS(action, 6, 0)
196   sendcount = parse_integer<int>(action[2]);
197   dst = std::stoi(action[3]);
198   recvcount = parse_integer<int>(action[4]);
199   src = std::stoi(action[5]);
200   datatype1 = parse_datatype(action, 6);
201   datatype2 = parse_datatype(action, 7);
202 }
203
204 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
205 {
206   CHECK_ACTION_PARAMS(action, 1, 2)
207   size      = parse_integer<size_t>(action[2]);
208   root      = parse_root(action, 3);
209   datatype1 = parse_datatype(action, 4);
210 }
211
212 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 2)
215   comm_size = parse_integer<unsigned>(action[2]);
216   comp_size = parse_double(action[3]);
217   root      = parse_root(action, 4);
218   datatype1 = parse_datatype(action, 5);
219 }
220
221 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
222 {
223   CHECK_ACTION_PARAMS(action, 2, 1)
224   comm_size = parse_integer<unsigned>(action[2]);
225   comp_size = parse_double(action[3]);
226   datatype1 = parse_datatype(action, 4);
227 }
228
229 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
230 {
231   CHECK_ACTION_PARAMS(action, 2, 1)
232   comm_size = MPI_COMM_WORLD->size();
233   send_size = parse_integer<int>(action[2]);
234   recv_size = parse_integer<int>(action[3]);
235   datatype1 = parse_datatype(action, 4);
236   datatype2 = parse_datatype(action, 5);
237 }
238
239 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
240 {
241   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
242         0 gather 68 68 0 0 0
243       where:
244         1) 68 is the sendcounts
245         2) 68 is the recvcounts
246         3) 0 is the root node
247         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249   */
250   CHECK_ACTION_PARAMS(action, 2, 3)
251   comm_size = MPI_COMM_WORLD->size();
252   send_size = parse_integer<int>(action[2]);
253   recv_size = parse_integer<int>(action[3]);
254
255   if (name == "gather") {
256     root      = parse_root(action, 4);
257     datatype1 = parse_datatype(action, 5);
258     datatype2 = parse_datatype(action, 6);
259   } else {
260     root      = 0;
261     datatype1 = parse_datatype(action, 4);
262     datatype2 = parse_datatype(action, 5);
263   }
264 }
265
266 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
267 {
268   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
269        0 gather 68 68 10 10 10 0 0 0
270      where:
271        1) 68 is the sendcount
272        2) 68 10 10 10 is the recvcounts
273        3) 0 is the root node
274        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
275        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
276   */
277   comm_size = MPI_COMM_WORLD->size();
278   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
279   send_size  = parse_integer<int>(action[2]);
280   disps      = std::vector<int>(comm_size, 0);
281   recvcounts = std::make_shared<std::vector<int>>(comm_size);
282
283   if (name == "gatherv") {
284     root      = parse_root(action, 3 + comm_size);
285     datatype1 = parse_datatype(action, 4 + comm_size);
286     datatype2 = parse_datatype(action, 5 + comm_size);
287   } else {
288     root                = 0;
289     unsigned disp_index = 0;
290     /* The 3 comes from "0 gather <sendcount>", which must always be present.
291      * The + comm_size is the recvcounts array, which must also be present
292      */
293     if (action.size() > 3 + comm_size + comm_size) {
294       // datatype + disp are specified
295       datatype1  = parse_datatype(action, 3 + comm_size);
296       datatype2  = parse_datatype(action, 4 + comm_size);
297       disp_index = 5 + comm_size;
298     } else if (action.size() > 3 + comm_size + 2) {
299       // disps specified; datatype is not specified; use the default one
300       datatype1  = MPI_DEFAULT_TYPE;
301       datatype2  = MPI_DEFAULT_TYPE;
302       disp_index = 3 + comm_size;
303     } else {
304       // no disp specified, maybe only datatype,
305       datatype1 = parse_datatype(action, 3 + comm_size);
306       datatype2 = parse_datatype(action, 4 + comm_size);
307     }
308
309     if (disp_index != 0) {
310       xbt_assert(disp_index + comm_size <= action.size());
311       for (unsigned i = 0; i < comm_size; i++)
312         disps[i]          = std::stoi(action[disp_index + i]);
313     }
314   }
315
316   for (unsigned int i = 0; i < comm_size; i++) {
317     (*recvcounts)[i] = std::stoi(action[i + 3]);
318   }
319   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
320 }
321
322 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
323 {
324   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
325         0 gather 68 68 0 0 0
326       where:
327         1) 68 is the sendcounts
328         2) 68 is the recvcounts
329         3) 0 is the root node
330         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
331         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
332   */
333   comm_size = MPI_COMM_WORLD->size();
334   CHECK_ACTION_PARAMS(action, 2, 3)
335   comm_size = MPI_COMM_WORLD->size();
336   send_size = parse_integer<int>(action[2]);
337   recv_size = parse_integer<int>(action[3]);
338   root      = parse_root(action, 4);
339   datatype1 = parse_datatype(action, 5);
340   datatype2 = parse_datatype(action, 6);
341 }
342
343 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
344 {
345   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
346      0 gather 68 10 10 10 68 0 0 0
347       where:
348       1) 68 10 10 10 is the sendcounts
349       2) 68 is the recvcount
350       3) 0 is the root node
351       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
352       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
353   */
354   comm_size = MPI_COMM_WORLD->size();
355   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
356   recv_size  = parse_integer<int>(action[2 + comm_size]);
357   disps      = std::vector<int>(comm_size, 0);
358   sendcounts = std::make_shared<std::vector<int>>(comm_size);
359
360   root      = parse_root(action, 3 + comm_size);
361   datatype1 = parse_datatype(action, 4 + comm_size);
362   datatype2 = parse_datatype(action, 5 + comm_size);
363
364   for (unsigned int i = 0; i < comm_size; i++) {
365     (*sendcounts)[i] = std::stoi(action[i + 2]);
366   }
367   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
368 }
369
370 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
371 {
372   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
373        0 reducescatter 275427 275427 275427 204020 11346849 0
374      where:
375        1) The first four values after the name of the action declare the recvcounts array
376        2) The value 11346849 is the amount of instructions
377        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
378   */
379   comm_size = MPI_COMM_WORLD->size();
380   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
381   comp_size  = parse_double(action[2 + comm_size]);
382   recvcounts = std::make_shared<std::vector<int>>(comm_size);
383   datatype1  = parse_datatype(action, 3 + comm_size);
384
385   for (unsigned int i = 0; i < comm_size; i++) {
386     (*recvcounts)[i]= std::stoi(action[i + 2]);
387   }
388   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
389 }
390
391 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
392 {
393   CHECK_ACTION_PARAMS(action, 2, 1)
394   size      = parse_integer<size_t>(action[2]);
395   comp_size = parse_double(action[3]);
396   datatype1 = parse_datatype(action, 4);
397 }
398
399 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
400 {
401   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
402         0 alltoallv 100 1 7 10 12 100 1 70 10 5
403      where:
404       1) 100 is the size of the send buffer *sizeof(int),
405       2) 1 7 10 12 is the sendcounts array
406       3) 100*sizeof(int) is the size of the receiver buffer
407       4)  1 70 10 5 is the recvcounts array
408   */
409   comm_size = MPI_COMM_WORLD->size();
410   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
411   sendcounts = std::make_shared<std::vector<int>>(comm_size);
412   recvcounts = std::make_shared<std::vector<int>>(comm_size);
413   senddisps  = std::vector<int>(comm_size, 0);
414   recvdisps  = std::vector<int>(comm_size, 0);
415
416   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
417   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
418
419   send_buf_size = parse_integer<int>(action[2]);
420   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
421   for (unsigned int i = 0; i < comm_size; i++) {
422     (*sendcounts)[i] = std::stoi(action[3 + i]);
423     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
424   }
425   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
426   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
427 }
428
429 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
430 {
431   std::string s = boost::algorithm::join(action, " ");
432   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
433   const WaitTestParser& args = get_args();
434   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
435
436   if (request == MPI_REQUEST_NULL) {
437     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
438      * return.*/
439     return;
440   }
441
442   // Must be taken before Request::wait() since the request may be set to
443   // MPI_REQUEST_NULL by Request::wait!
444   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
445
446   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
447
448   MPI_Status status;
449   Request::wait(&request, &status);
450   if(request!=MPI_REQUEST_NULL)
451     Request::unref(&request);
452   TRACE_smpi_comm_out(get_pid());
453   if (is_wait_for_receive)
454     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
455 }
456
457 void SendAction::kernel(simgrid::xbt::ReplayAction&)
458 {
459   const SendOrRecvParser& args = get_args();
460   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
461
462   TRACE_smpi_comm_in(
463       get_pid(), __func__,
464       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
465   if (not TRACE_smpi_view_internals())
466     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
467
468   if (get_name() == "send") {
469     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470   } else if (get_name() == "isend") {
471     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
472     req_storage.add(request);
473   } else {
474     xbt_die("Don't know this action, %s", get_name().c_str());
475   }
476
477   TRACE_smpi_comm_out(get_pid());
478 }
479
480 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
481 {
482   const SendOrRecvParser& args = get_args();
483   TRACE_smpi_comm_in(
484       get_pid(), __func__,
485       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
486
487   MPI_Status status;
488   // unknown size from the receiver point of view
489   ssize_t arg_size = args.size;
490   if (arg_size < 0) {
491     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
492     arg_size = status.count;
493   }
494
495   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
496   if (get_name() == "recv") {
497     is_recv = true;
498     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
499   } else if (get_name() == "irecv") {
500     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
501     req_storage.add(request);
502   } else {
503     THROW_IMPOSSIBLE;
504   }
505
506   TRACE_smpi_comm_out(get_pid());
507   if (is_recv && not TRACE_smpi_view_internals()) {
508     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
509     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
510   }
511 }
512
513 void SendRecvAction::kernel(simgrid::xbt::ReplayAction&)
514 {
515   XBT_DEBUG("Enters SendRecv");
516   const SendRecvParser& args = get_args();
517   aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
518   aid_t src_traced = MPI_COMM_WORLD->group()->actor(args.src);
519   aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.dst);
520
521   MPI_Status status;
522   int sendtag=0;
523   int recvtag=0;
524
525   // FIXME: Hack the way to trace this one
526   auto dst_hack = std::make_shared<std::vector<int>>();
527   auto src_hack = std::make_shared<std::vector<int>>();
528   dst_hack->push_back(dst_traced);
529   src_hack->push_back(src_traced);
530   TRACE_smpi_comm_in(my_proc_id, __func__,
531                        new simgrid::instr::VarCollTIData(
532                            "sendRecv", -1, args.sendcount,
533                          dst_hack, args.recvcount, src_hack,
534                            simgrid::smpi::Datatype::encode(args.datatype1), simgrid::smpi::Datatype::encode(args.datatype2)));
535
536   TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, sendtag, args.sendcount * args.datatype1->size());
537
538   simgrid::smpi::Request::sendrecv(nullptr, args.sendcount, args.datatype1, args.dst, sendtag, nullptr, args.recvcount, args.datatype2, args.src,
539                                      recvtag, MPI_COMM_WORLD, &status);
540
541   TRACE_smpi_recv(src_traced, my_proc_id, recvtag);
542   TRACE_smpi_comm_out(my_proc_id);
543   XBT_DEBUG("Exits SendRecv");
544
545 }
546
547 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
548 {
549   const ComputeParser& args = get_args();
550   if (smpi_cfg_simulate_computation()) {
551     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
552   }
553 }
554
555 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
556 {
557   const SleepParser& args = get_args();
558   XBT_DEBUG("Sleep for: %lf secs", args.time);
559   aid_t pid = simgrid::s4u::this_actor::get_pid();
560   TRACE_smpi_sleeping_in(pid, args.time);
561   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
562   TRACE_smpi_sleeping_out(pid);
563 }
564
565 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
566 {
567   const LocationParser& args = get_args();
568   smpi_trace_set_call_location(args.filename.c_str(), args.line);
569 }
570
571 void TestAction::kernel(simgrid::xbt::ReplayAction&)
572 {
573   const WaitTestParser& args = get_args();
574   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
575   // if request is null here, this may mean that a previous test has succeeded
576   // Different times in traced application and replayed version may lead to this
577   // In this case, ignore the extra calls.
578   if (request != MPI_REQUEST_NULL) {
579     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
580
581     MPI_Status status;
582     int flag = 0;
583     Request::test(&request, &status, &flag);
584
585     XBT_DEBUG("MPI_Test result: %d", flag);
586     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
587      * nullptr.*/
588     if (request == MPI_REQUEST_NULL)
589       req_storage.addNullRequest(args.src, args.dst, args.tag);
590     else
591       req_storage.add(request);
592
593     TRACE_smpi_comm_out(get_pid());
594   }
595 }
596
597 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
598 {
599   CHECK_ACTION_PARAMS(action, 0, 1)
600     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
601     : MPI_BYTE;  // default TAU datatype
602
603   /* start a simulated timer */
604   smpi_process()->simulated_start();
605 }
606
607 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
608 {
609   /* nothing to do */
610 }
611
612 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
613 {
614   if (req_storage.size() > 0) {
615     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
616     std::vector<MPI_Request> reqs;
617     req_storage.get_requests(reqs);
618     unsigned long count_requests = reqs.size();
619     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
620     for (auto const& req : reqs) {
621       if (req && (req->flags() & MPI_REQ_RECV)) {
622         sender_receiver.emplace_back(req->src(), req->dst());
623       }
624     }
625     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
626     req_storage.get_store().clear();
627
628     for (MPI_Request& req : reqs)
629       if (req != MPI_REQUEST_NULL)
630         Request::unref(&req);
631
632     for (auto const& [src, dst] : sender_receiver) {
633       TRACE_smpi_recv(src, dst, 0);
634     }
635     TRACE_smpi_comm_out(get_pid());
636   }
637 }
638
639 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
640 {
641   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
642   colls::barrier(MPI_COMM_WORLD);
643   TRACE_smpi_comm_out(get_pid());
644 }
645
646 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
647 {
648   const BcastArgParser& args = get_args();
649   TRACE_smpi_comm_in(get_pid(), "action_bcast",
650                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
651                                                     0, Datatype::encode(args.datatype1), ""));
652
653   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
654
655   TRACE_smpi_comm_out(get_pid());
656 }
657
658 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
659 {
660   const ReduceArgParser& args = get_args();
661   TRACE_smpi_comm_in(get_pid(), "action_reduce",
662                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
663                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
664
665   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
666                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
667                 args.root, MPI_COMM_WORLD);
668   if (args.comp_size != 0.0)
669     simgrid::s4u::this_actor::exec_init(args.comp_size)
670       ->set_name("computation")
671       ->start()
672       ->wait();
673
674   TRACE_smpi_comm_out(get_pid());
675 }
676
677 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
678 {
679   const AllReduceArgParser& args = get_args();
680   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
681                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
682                                                     Datatype::encode(args.datatype1), ""));
683
684   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
685                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
686                    MPI_COMM_WORLD);
687   if (args.comp_size != 0.0)
688     simgrid::s4u::this_actor::exec_init(args.comp_size)
689       ->set_name("computation")
690       ->start()
691       ->wait();
692
693   TRACE_smpi_comm_out(get_pid());
694 }
695
696 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
697 {
698   const AllToAllArgParser& args = get_args();
699   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
700                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
701                                                     Datatype::encode(args.datatype1),
702                                                     Datatype::encode(args.datatype2)));
703
704   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
705                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
706                   MPI_COMM_WORLD);
707
708   TRACE_smpi_comm_out(get_pid());
709 }
710
711 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
712 {
713   const GatherArgParser& args = get_args();
714   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
715                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
716                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
717                                                     Datatype::encode(args.datatype2)));
718
719   if (get_name() == "gather") {
720     int rank = MPI_COMM_WORLD->rank();
721     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
722                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
723                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
724   } else
725     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
726                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
727                      MPI_COMM_WORLD);
728
729   TRACE_smpi_comm_out(get_pid());
730 }
731
732 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
733 {
734   int rank = MPI_COMM_WORLD->rank();
735   const GatherVArgParser& args = get_args();
736   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
737                      new simgrid::instr::VarCollTIData(
738                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
739                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
740
741   if (get_name() == "gatherv") {
742     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
743                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
744                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
745   } else {
746     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
748                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
749   }
750
751   TRACE_smpi_comm_out(get_pid());
752 }
753
754 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
755 {
756   int rank = MPI_COMM_WORLD->rank();
757   const ScatterArgParser& args = get_args();
758   TRACE_smpi_comm_in(get_pid(), "action_scatter",
759                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
760                                                     Datatype::encode(args.datatype1),
761                                                     Datatype::encode(args.datatype2)));
762
763   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
764                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
765                  args.datatype2, args.root, MPI_COMM_WORLD);
766
767   TRACE_smpi_comm_out(get_pid());
768 }
769
770 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
771 {
772   int rank = MPI_COMM_WORLD->rank();
773   const ScatterVArgParser& args = get_args();
774   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
775                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
776                                                        nullptr, Datatype::encode(args.datatype1),
777                                                        Datatype::encode(args.datatype2)));
778
779   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
780                   args.sendcounts->data(), args.disps.data(), args.datatype1,
781                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
782                   MPI_COMM_WORLD);
783
784   TRACE_smpi_comm_out(get_pid());
785 }
786
787 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
788 {
789   const ReduceScatterArgParser& args = get_args();
790   TRACE_smpi_comm_in(
791       get_pid(), "action_reducescatter",
792       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
793                                         /* ugly as we use datatype field to pass computation as string */
794                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
795                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
796                                         Datatype::encode(args.datatype1)));
797
798   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
799                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
800                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
801   if (args.comp_size != 0.0)
802     simgrid::s4u::this_actor::exec_init(args.comp_size)
803       ->set_name("computation")
804       ->start()
805       ->wait();
806   TRACE_smpi_comm_out(get_pid());
807 }
808
809 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
810 {
811   const ScanArgParser& args = get_args();
812   TRACE_smpi_comm_in(get_pid(), "action_scan",
813                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
814                      args.size, 0, Datatype::encode(args.datatype1), ""));
815   if (get_name() == "scan")
816     colls::scan(send_buffer(args.size * args.datatype1->size()),
817               recv_buffer(args.size * args.datatype1->size()), args.size,
818               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
819   else
820     colls::exscan(send_buffer(args.size * args.datatype1->size()),
821               recv_buffer(args.size * args.datatype1->size()), args.size,
822               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
823
824   if (args.comp_size != 0.0)
825     simgrid::s4u::this_actor::exec_init(args.comp_size)
826       ->set_name("computation")
827       ->start()
828       ->wait();
829   TRACE_smpi_comm_out(get_pid());
830 }
831
832 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
833 {
834   const AllToAllVArgParser& args = get_args();
835   TRACE_smpi_comm_in(get_pid(), __func__,
836                      new simgrid::instr::VarCollTIData(
837                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
838                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
839
840   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
841                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
842                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
843
844   TRACE_smpi_comm_out(get_pid());
845 }
846 } // namespace simgrid::smpi::replay
847
848 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
849 /** @brief Only initialize the replay, don't do it for real */
850 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
851 {
852   xbt_assert(not smpi_process()->initializing());
853
854   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
855   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
856   simgrid::smpi::ActorExt::init();
857
858   smpi_process()->mark_as_initialized();
859   smpi_process()->set_replaying(true);
860
861   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
862   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
863   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
864   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
865   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
866   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
867   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
868   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
869   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
870   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
871   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
872   xbt_replay_action_register("sendRecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendRecvAction().execute(action); });
873   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
874   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
875   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
876   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
877   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
878   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
879   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
880   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
881   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
882   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
883   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
884   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
885   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
886   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
887   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
888   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
889   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
890   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
891   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
892   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
893
894   //if we have a delayed start, sleep here.
895   if (start_delay_flops > 0) {
896     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
897     private_execute_flops(start_delay_flops);
898   } else {
899     // Wait for the other actors to initialize also
900     simgrid::s4u::this_actor::yield();
901   }
902   if(_smpi_init_sleep > 0)
903     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
904 }
905
906 /** @brief actually run the replay after initialization */
907 void smpi_replay_main(int rank, const char* private_trace_filename)
908 {
909   static int active_processes = 0;
910   active_processes++;
911   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
912   std::string rank_string                      = std::to_string(rank);
913   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
914
915   /* and now, finalize everything */
916   /* One active process will stop. Decrease the counter*/
917   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
918   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
919   if (count_requests > 0) {
920     std::vector<MPI_Request> requests(count_requests);
921     unsigned int i=0;
922
923     for (auto const& [_, reqs] : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
924       for (auto& req : reqs) {
925         requests[i] = req; // FIXME: overwritten at each iteration?
926       }
927       i++;
928     }
929     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
930   }
931
932   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
933     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
934
935   active_processes--;
936
937   if(active_processes==0){
938     /* Last process alive speaking: end the simulated timer */
939     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
940     smpi_free_replay_tmp_buffers();
941   }
942
943   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
944                      new simgrid::instr::NoOpTIData("finalize"));
945
946   smpi_process()->finalize();
947
948   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
949 }
950
951 /** @brief chain a replay initialization and a replay start */
952 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
953 {
954   smpi_replay_init(instance_id, rank, start_delay_flops);
955   smpi_replay_main(rank, private_trace_filename);
956 }