Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix waitall handling in SMPI replay.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2022. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_replay.hpp"
7 #include "simgrid/s4u/Exec.hpp"
8 #include "smpi_coll.hpp"
9 #include "smpi_comm.hpp"
10 #include "smpi_config.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_group.hpp"
13 #include "smpi_request.hpp"
14 #include "src/smpi/include/private.hpp"
15 #include "xbt/replay.hpp"
16
17 #include <cmath>
18 #include <limits>
19 #include <memory>
20 #include <numeric>
21 #include <tuple>
22 #include <unordered_map>
23 #include <vector>
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
26 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
27 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
28 // this could go into a header file.
29 namespace hash_tuple {
30 template <typename TT> class hash {
31 public:
32   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 };
34
35 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
36 {
37   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 }
39
40 // Recursive template code derived from Matthieu M.
41 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
42 public:
43   static void apply(size_t& seed, Tuple const& tuple)
44   {
45     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
46     hash_combine(seed, std::get<Index>(tuple));
47   }
48 };
49
50 template <class Tuple> class HashValueImpl<Tuple, 0> {
51 public:
52   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 };
54
55 template <typename... TT> class hash<std::tuple<TT...>> {
56 public:
57   size_t operator()(std::tuple<TT...> const& tt) const
58   {
59     size_t seed = 0;
60     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
61     return seed;
62   }
63 };
64 }
65
66 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
67 {
68   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
69     std::string s = boost::algorithm::join(action, " ");
70     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
71   }
72 }
73
74 /* Helper functions */
75 static double parse_double(const std::string& string)
76 {
77   return xbt_str_parse_double(string.c_str(), "not a double");
78 }
79
80 template <typename T> static T parse_integer(const std::string& string)
81 {
82   double val = trunc(xbt_str_parse_double(string.c_str(), "not a double"));
83   xbt_assert(static_cast<double>(std::numeric_limits<T>::min()) <= val &&
84                  val <= static_cast<double>(std::numeric_limits<T>::max()),
85              "out of range: %g", val);
86   return static_cast<T>(val);
87 }
88
89 static int parse_root(const simgrid::xbt::ReplayAction& action, unsigned i)
90 {
91   return i < action.size() ? std::stoi(action[i]) : 0;
92 }
93
94 static MPI_Datatype parse_datatype(const simgrid::xbt::ReplayAction& action, unsigned i)
95 {
96   return i < action.size() ? simgrid::smpi::Datatype::decode(action[i]) : simgrid::smpi::replay::MPI_DEFAULT_TYPE;
97 }
98
99 namespace simgrid {
100 namespace smpi {
101
102 namespace replay {
103 MPI_Datatype MPI_DEFAULT_TYPE;
104
105 class RequestStorage {
106 private:
107   using req_key_t     = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
108   using req_storage_t = std::unordered_map<req_key_t, std::list<MPI_Request>, hash_tuple::hash<std::tuple<int, int, int>>>;
109
110   req_storage_t store;
111
112 public:
113   RequestStorage() = default;
114   size_t size() const { return store.size(); }
115
116   req_storage_t& get_store() { return store; }
117
118   void get_requests(std::vector<MPI_Request>& vec) const
119   {
120     for (auto const& pair : store) {
121       auto& reqs       = pair.second;
122       aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
123       for (auto& req: reqs){
124         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
125           vec.push_back(req);
126           req->print_request("MM");
127         }
128       }
129     }
130   }
131
132   MPI_Request pop(int src, int dst, int tag)
133   {
134     auto it = store.find(req_key_t(src, dst, tag));
135     if (it == store.end())
136      return MPI_REQUEST_NULL;
137     MPI_Request req = it->second.front();
138     it->second.pop_front();
139     if(it->second.empty())
140       store.erase(req_key_t(src, dst, tag));
141     return req;
142   }
143
144   void add(MPI_Request req)
145   {
146     if (req != MPI_REQUEST_NULL){ // Can and does happen in the case of TestAction
147       auto it = store.find(req_key_t(req->src()-1, req->dst()-1, req->tag()));
148       if (it == store.end())
149         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), std::list<MPI_Request>()});
150       store[req_key_t(req->src()-1, req->dst()-1, req->tag())].push_back(req);
151     }
152   }
153
154   /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
155   void addNullRequest(int src, int dst, int tag)
156   {
157     int src_pid = MPI_COMM_WORLD->group()->actor(src) - 1;
158     int dest_pid = MPI_COMM_WORLD->group()->actor(dst) - 1;
159     auto it = store.find(req_key_t(src_pid, dest_pid, tag));
160     if (it == store.end())
161       store.insert({req_key_t(src_pid, dest_pid, tag), std::list<MPI_Request>()});
162     store[req_key_t(src_pid, dest_pid, tag)].push_back(MPI_REQUEST_NULL);
163   }
164 };
165
166 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
167 {
168   CHECK_ACTION_PARAMS(action, 3, 0)
169   src = std::stoi(action[2]);
170   dst = std::stoi(action[3]);
171   tag = std::stoi(action[4]);
172 }
173
174 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 {
176   CHECK_ACTION_PARAMS(action, 3, 1)
177   partner = std::stoi(action[2]);
178   tag     = std::stoi(action[3]);
179   size      = parse_integer<size_t>(action[4]);
180   datatype1 = parse_datatype(action, 5);
181 }
182
183 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 {
185   CHECK_ACTION_PARAMS(action, 1, 0)
186   flops = parse_double(action[2]);
187 }
188
189 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
190 {
191   CHECK_ACTION_PARAMS(action, 1, 0)
192   time = parse_double(action[2]);
193 }
194
195 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
196 {
197   CHECK_ACTION_PARAMS(action, 2, 0)
198   filename = std::string(action[2]);
199   line = std::stoi(action[3]);
200 }
201
202 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 {
204   CHECK_ACTION_PARAMS(action, 1, 2)
205   size      = parse_integer<size_t>(action[2]);
206   root      = parse_root(action, 3);
207   datatype1 = parse_datatype(action, 4);
208 }
209
210 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 2)
213   comm_size = parse_integer<unsigned>(action[2]);
214   comp_size = parse_double(action[3]);
215   root      = parse_root(action, 4);
216   datatype1 = parse_datatype(action, 5);
217 }
218
219 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
220 {
221   CHECK_ACTION_PARAMS(action, 2, 1)
222   comm_size = parse_integer<unsigned>(action[2]);
223   comp_size = parse_double(action[3]);
224   datatype1 = parse_datatype(action, 4);
225 }
226
227 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
228 {
229   CHECK_ACTION_PARAMS(action, 2, 1)
230   comm_size = MPI_COMM_WORLD->size();
231   send_size = parse_integer<int>(action[2]);
232   recv_size = parse_integer<int>(action[3]);
233   datatype1 = parse_datatype(action, 4);
234   datatype2 = parse_datatype(action, 5);
235 }
236
237 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
238 {
239   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
240         0 gather 68 68 0 0 0
241       where:
242         1) 68 is the sendcounts
243         2) 68 is the recvcounts
244         3) 0 is the root node
245         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
246         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
247   */
248   CHECK_ACTION_PARAMS(action, 2, 3)
249   comm_size = MPI_COMM_WORLD->size();
250   send_size = parse_integer<int>(action[2]);
251   recv_size = parse_integer<int>(action[3]);
252
253   if (name == "gather") {
254     root      = parse_root(action, 4);
255     datatype1 = parse_datatype(action, 5);
256     datatype2 = parse_datatype(action, 6);
257   } else {
258     root      = 0;
259     datatype1 = parse_datatype(action, 4);
260     datatype2 = parse_datatype(action, 5);
261   }
262 }
263
264 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
265 {
266   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
267        0 gather 68 68 10 10 10 0 0 0
268      where:
269        1) 68 is the sendcount
270        2) 68 10 10 10 is the recvcounts
271        3) 0 is the root node
272        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
273        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
274   */
275   comm_size = MPI_COMM_WORLD->size();
276   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
277   send_size  = parse_integer<int>(action[2]);
278   disps      = std::vector<int>(comm_size, 0);
279   recvcounts = std::make_shared<std::vector<int>>(comm_size);
280
281   if (name == "gatherv") {
282     root      = parse_root(action, 3 + comm_size);
283     datatype1 = parse_datatype(action, 4 + comm_size);
284     datatype2 = parse_datatype(action, 5 + comm_size);
285   } else {
286     root                = 0;
287     unsigned disp_index = 0;
288     /* The 3 comes from "0 gather <sendcount>", which must always be present.
289      * The + comm_size is the recvcounts array, which must also be present
290      */
291     if (action.size() > 3 + comm_size + comm_size) {
292       // datatype + disp are specified
293       datatype1  = parse_datatype(action, 3 + comm_size);
294       datatype2  = parse_datatype(action, 4 + comm_size);
295       disp_index = 5 + comm_size;
296     } else if (action.size() > 3 + comm_size + 2) {
297       // disps specified; datatype is not specified; use the default one
298       datatype1  = MPI_DEFAULT_TYPE;
299       datatype2  = MPI_DEFAULT_TYPE;
300       disp_index = 3 + comm_size;
301     } else {
302       // no disp specified, maybe only datatype,
303       datatype1 = parse_datatype(action, 3 + comm_size);
304       datatype2 = parse_datatype(action, 4 + comm_size);
305     }
306
307     if (disp_index != 0) {
308       xbt_assert(disp_index + comm_size <= action.size());
309       for (unsigned i = 0; i < comm_size; i++)
310         disps[i]          = std::stoi(action[disp_index + i]);
311     }
312   }
313
314   for (unsigned int i = 0; i < comm_size; i++) {
315     (*recvcounts)[i] = std::stoi(action[i + 3]);
316   }
317   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
318 }
319
320 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
321 {
322   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
323         0 gather 68 68 0 0 0
324       where:
325         1) 68 is the sendcounts
326         2) 68 is the recvcounts
327         3) 0 is the root node
328         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
329         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
330   */
331   comm_size = MPI_COMM_WORLD->size();
332   CHECK_ACTION_PARAMS(action, 2, 3)
333   comm_size = MPI_COMM_WORLD->size();
334   send_size = parse_integer<int>(action[2]);
335   recv_size = parse_integer<int>(action[3]);
336   root      = parse_root(action, 4);
337   datatype1 = parse_datatype(action, 5);
338   datatype2 = parse_datatype(action, 6);
339 }
340
341 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
342 {
343   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
344      0 gather 68 10 10 10 68 0 0 0
345       where:
346       1) 68 10 10 10 is the sendcounts
347       2) 68 is the recvcount
348       3) 0 is the root node
349       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
350       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
351   */
352   comm_size = MPI_COMM_WORLD->size();
353   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
354   recv_size  = parse_integer<int>(action[2 + comm_size]);
355   disps      = std::vector<int>(comm_size, 0);
356   sendcounts = std::make_shared<std::vector<int>>(comm_size);
357
358   root      = parse_root(action, 3 + comm_size);
359   datatype1 = parse_datatype(action, 4 + comm_size);
360   datatype2 = parse_datatype(action, 5 + comm_size);
361
362   for (unsigned int i = 0; i < comm_size; i++) {
363     (*sendcounts)[i] = std::stoi(action[i + 2]);
364   }
365   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
366 }
367
368 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
369 {
370   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
371        0 reducescatter 275427 275427 275427 204020 11346849 0
372      where:
373        1) The first four values after the name of the action declare the recvcounts array
374        2) The value 11346849 is the amount of instructions
375        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
376   */
377   comm_size = MPI_COMM_WORLD->size();
378   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
379   comp_size  = parse_double(action[2 + comm_size]);
380   recvcounts = std::make_shared<std::vector<int>>(comm_size);
381   datatype1  = parse_datatype(action, 3 + comm_size);
382
383   for (unsigned int i = 0; i < comm_size; i++) {
384     (*recvcounts)[i]= std::stoi(action[i + 2]);
385   }
386   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
387 }
388
389 void ScanArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
390 {
391   CHECK_ACTION_PARAMS(action, 2, 1)
392   size      = parse_integer<size_t>(action[2]);
393   comp_size = parse_double(action[3]);
394   datatype1 = parse_datatype(action, 4);
395 }
396
397 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
398 {
399   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
400         0 alltoallv 100 1 7 10 12 100 1 70 10 5
401      where:
402       1) 100 is the size of the send buffer *sizeof(int),
403       2) 1 7 10 12 is the sendcounts array
404       3) 100*sizeof(int) is the size of the receiver buffer
405       4)  1 70 10 5 is the recvcounts array
406   */
407   comm_size = MPI_COMM_WORLD->size();
408   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
409   sendcounts = std::make_shared<std::vector<int>>(comm_size);
410   recvcounts = std::make_shared<std::vector<int>>(comm_size);
411   senddisps  = std::vector<int>(comm_size, 0);
412   recvdisps  = std::vector<int>(comm_size, 0);
413
414   datatype1 = parse_datatype(action, 4 + 2 * comm_size);
415   datatype2 = parse_datatype(action, 5 + 2 * comm_size);
416
417   send_buf_size = parse_integer<int>(action[2]);
418   recv_buf_size = parse_integer<int>(action[3 + comm_size]);
419   for (unsigned int i = 0; i < comm_size; i++) {
420     (*sendcounts)[i] = std::stoi(action[3 + i]);
421     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
422   }
423   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
424   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
425 }
426
427 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
428 {
429   std::string s = boost::algorithm::join(action, " ");
430   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
431   const WaitTestParser& args = get_args();
432   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
433
434   if (request == MPI_REQUEST_NULL) {
435     /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
436      * return.*/
437     return;
438   }
439
440   // Must be taken before Request::wait() since the request may be set to
441   // MPI_REQUEST_NULL by Request::wait!
442   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
443
444   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("wait", args.src, args.dst, args.tag));
445
446   MPI_Status status;
447   Request::wait(&request, &status);
448   if(request!=MPI_REQUEST_NULL)
449     Request::unref(&request);
450   TRACE_smpi_comm_out(get_pid());
451   if (is_wait_for_receive)
452     TRACE_smpi_recv(MPI_COMM_WORLD->group()->actor(args.src), MPI_COMM_WORLD->group()->actor(args.dst), args.tag);
453 }
454
455 void SendAction::kernel(simgrid::xbt::ReplayAction&)
456 {
457   const SendRecvParser& args = get_args();
458   aid_t dst_traced           = MPI_COMM_WORLD->group()->actor(args.partner);
459
460   TRACE_smpi_comm_in(
461       get_pid(), __func__,
462       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
463   if (not TRACE_smpi_view_internals())
464     TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
465
466   if (get_name() == "send") {
467     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
468   } else if (get_name() == "isend") {
469     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
470     req_storage.add(request);
471   } else {
472     xbt_die("Don't know this action, %s", get_name().c_str());
473   }
474
475   TRACE_smpi_comm_out(get_pid());
476 }
477
478 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
479 {
480   const SendRecvParser& args = get_args();
481   TRACE_smpi_comm_in(
482       get_pid(), __func__,
483       new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
484
485   MPI_Status status;
486   // unknown size from the receiver point of view
487   size_t arg_size = args.size;
488   if (arg_size == 0) {
489     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
490     arg_size = status.count;
491   }
492
493   bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
494   if (get_name() == "recv") {
495     is_recv = true;
496     Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
497   } else if (get_name() == "irecv") {
498     MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
499     req_storage.add(request);
500   } else {
501     THROW_IMPOSSIBLE;
502   }
503
504   TRACE_smpi_comm_out(get_pid());
505   if (is_recv && not TRACE_smpi_view_internals()) {
506     aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
507     TRACE_smpi_recv(src_traced, get_pid(), args.tag);
508   }
509 }
510
511 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
512 {
513   const ComputeParser& args = get_args();
514   if (smpi_cfg_simulate_computation()) {
515     smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
516   }
517 }
518
519 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
520 {
521   const SleepParser& args = get_args();
522   XBT_DEBUG("Sleep for: %lf secs", args.time);
523   aid_t pid = simgrid::s4u::this_actor::get_pid();
524   TRACE_smpi_sleeping_in(pid, args.time);
525   simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
526   TRACE_smpi_sleeping_out(pid);
527 }
528
529 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
530 {
531   const LocationParser& args = get_args();
532   smpi_trace_set_call_location(args.filename.c_str(), args.line);
533 }
534
535 void TestAction::kernel(simgrid::xbt::ReplayAction&)
536 {
537   const WaitTestParser& args = get_args();
538   MPI_Request request = req_storage.pop(args.src, args.dst, args.tag);
539   // if request is null here, this may mean that a previous test has succeeded
540   // Different times in traced application and replayed version may lead to this
541   // In this case, ignore the extra calls.
542   if (request != MPI_REQUEST_NULL) {
543     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::WaitTIData("test", args.src, args.dst, args.tag));
544
545     MPI_Status status;
546     int flag = 0;
547     Request::test(&request, &status, &flag);
548
549     XBT_DEBUG("MPI_Test result: %d", flag);
550     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
551      * nullptr.*/
552     if (request == MPI_REQUEST_NULL)
553       req_storage.addNullRequest(args.src, args.dst, args.tag);
554     else
555       req_storage.add(request);
556
557     TRACE_smpi_comm_out(get_pid());
558   }
559 }
560
561 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
562 {
563   CHECK_ACTION_PARAMS(action, 0, 1)
564     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
565     : MPI_BYTE;  // default TAU datatype
566
567   /* start a simulated timer */
568   smpi_process()->simulated_start();
569 }
570
571 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
572 {
573   /* nothing to do */
574 }
575
576 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
577 {
578   if (req_storage.size() > 0) {
579     std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
580     std::vector<MPI_Request> reqs;
581     req_storage.get_requests(reqs);
582     unsigned long count_requests = reqs.size();
583     TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::CpuTIData("waitall", count_requests));
584     for (auto const& req : reqs) {
585       if (req && (req->flags() & MPI_REQ_RECV)) {
586         sender_receiver.emplace_back(req->src(), req->dst());
587       }
588     }
589     Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
590     req_storage.get_store().clear();
591
592     for (MPI_Request& req : reqs)
593       if (req != MPI_REQUEST_NULL)
594         Request::unref(&req);
595
596     for (auto const& pair : sender_receiver) {
597       TRACE_smpi_recv(pair.first, pair.second, 0);
598     }
599     TRACE_smpi_comm_out(get_pid());
600   }
601 }
602
603 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
604 {
605   TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
606   colls::barrier(MPI_COMM_WORLD);
607   TRACE_smpi_comm_out(get_pid());
608 }
609
610 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
611 {
612   const BcastArgParser& args = get_args();
613   TRACE_smpi_comm_in(get_pid(), "action_bcast",
614                      new simgrid::instr::CollTIData("bcast", args.root, -1.0, args.size,
615                                                     0, Datatype::encode(args.datatype1), ""));
616
617   colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
618
619   TRACE_smpi_comm_out(get_pid());
620 }
621
622 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
623 {
624   const ReduceArgParser& args = get_args();
625   TRACE_smpi_comm_in(get_pid(), "action_reduce",
626                      new simgrid::instr::CollTIData("reduce", args.root, args.comp_size,
627                                                     args.comm_size, 0, Datatype::encode(args.datatype1), ""));
628
629   colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
630                 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
631                 args.root, MPI_COMM_WORLD);
632   if (args.comp_size != 0.0)
633     simgrid::s4u::this_actor::exec_init(args.comp_size)
634       ->set_name("computation")
635       ->start()
636       ->wait();
637
638   TRACE_smpi_comm_out(get_pid());
639 }
640
641 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
642 {
643   const AllReduceArgParser& args = get_args();
644   TRACE_smpi_comm_in(get_pid(), "action_allreduce",
645                      new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, 0,
646                                                     Datatype::encode(args.datatype1), ""));
647
648   colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
649                    recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
650                    MPI_COMM_WORLD);
651   if (args.comp_size != 0.0)
652     simgrid::s4u::this_actor::exec_init(args.comp_size)
653       ->set_name("computation")
654       ->start()
655       ->wait();
656
657   TRACE_smpi_comm_out(get_pid());
658 }
659
660 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
661 {
662   const AllToAllArgParser& args = get_args();
663   TRACE_smpi_comm_in(get_pid(), "action_alltoall",
664                      new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
665                                                     Datatype::encode(args.datatype1),
666                                                     Datatype::encode(args.datatype2)));
667
668   colls::alltoall(send_buffer(args.datatype1->size() * args.send_size * args.comm_size), args.send_size, args.datatype1,
669                   recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size), args.recv_size, args.datatype2,
670                   MPI_COMM_WORLD);
671
672   TRACE_smpi_comm_out(get_pid());
673 }
674
675 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
676 {
677   const GatherArgParser& args = get_args();
678   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
679                      new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
680                                                     args.send_size, args.recv_size, Datatype::encode(args.datatype1),
681                                                     Datatype::encode(args.datatype2)));
682
683   if (get_name() == "gather") {
684     int rank = MPI_COMM_WORLD->rank();
685     colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
686                   (rank == args.root) ? recv_buffer(args.datatype2->size() * args.recv_size * args.comm_size) : nullptr,
687                   args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
688   } else
689     colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
690                      recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
691                      MPI_COMM_WORLD);
692
693   TRACE_smpi_comm_out(get_pid());
694 }
695
696 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
697 {
698   int rank = MPI_COMM_WORLD->rank();
699   const GatherVArgParser& args = get_args();
700   TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
701                      new simgrid::instr::VarCollTIData(
702                          get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
703                          args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
704
705   if (get_name() == "gatherv") {
706     colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
707                    (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
708                    args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
709   } else {
710     colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
711                       recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
712                       args.disps.data(), args.datatype2, MPI_COMM_WORLD);
713   }
714
715   TRACE_smpi_comm_out(get_pid());
716 }
717
718 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
719 {
720   int rank = MPI_COMM_WORLD->rank();
721   const ScatterArgParser& args = get_args();
722   TRACE_smpi_comm_in(get_pid(), "action_scatter",
723                      new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
724                                                     Datatype::encode(args.datatype1),
725                                                     Datatype::encode(args.datatype2)));
726
727   colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
728                  (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
729                  args.datatype2, args.root, MPI_COMM_WORLD);
730
731   TRACE_smpi_comm_out(get_pid());
732 }
733
734 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
735 {
736   int rank = MPI_COMM_WORLD->rank();
737   const ScatterVArgParser& args = get_args();
738   TRACE_smpi_comm_in(get_pid(), "action_scatterv",
739                      new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
740                                                        nullptr, Datatype::encode(args.datatype1),
741                                                        Datatype::encode(args.datatype2)));
742
743   colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
744                   args.sendcounts->data(), args.disps.data(), args.datatype1,
745                   recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
746                   MPI_COMM_WORLD);
747
748   TRACE_smpi_comm_out(get_pid());
749 }
750
751 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
752 {
753   const ReduceScatterArgParser& args = get_args();
754   TRACE_smpi_comm_in(
755       get_pid(), "action_reducescatter",
756       new simgrid::instr::VarCollTIData(get_name(), -1, -1, nullptr, -1, args.recvcounts,
757                                         /* ugly as we use datatype field to pass computation as string */
758                                         /* and because of the trick to avoid getting 0.000000 when 0 is given */
759                                         args.comp_size == 0 ? "0" : std::to_string(args.comp_size),
760                                         Datatype::encode(args.datatype1)));
761
762   colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
763                         recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
764                         args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
765   if (args.comp_size != 0.0)
766     simgrid::s4u::this_actor::exec_init(args.comp_size)
767       ->set_name("computation")
768       ->start()
769       ->wait();
770   TRACE_smpi_comm_out(get_pid());
771 }
772
773 void ScanAction::kernel(simgrid::xbt::ReplayAction&)
774 {
775   const ScanArgParser& args = get_args();
776   TRACE_smpi_comm_in(get_pid(), "action_scan",
777                      new simgrid::instr::CollTIData(get_name(), -1, args.comp_size,
778                      args.size, 0, Datatype::encode(args.datatype1), ""));
779   if (get_name() == "scan")
780     colls::scan(send_buffer(args.size * args.datatype1->size()),
781               recv_buffer(args.size * args.datatype1->size()), args.size,
782               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
783   else
784     colls::exscan(send_buffer(args.size * args.datatype1->size()),
785               recv_buffer(args.size * args.datatype1->size()), args.size,
786               args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
787
788   if (args.comp_size != 0.0)
789     simgrid::s4u::this_actor::exec_init(args.comp_size)
790       ->set_name("computation")
791       ->start()
792       ->wait();
793   TRACE_smpi_comm_out(get_pid());
794 }
795
796 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
797 {
798   const AllToAllVArgParser& args = get_args();
799   TRACE_smpi_comm_in(get_pid(), __func__,
800                      new simgrid::instr::VarCollTIData(
801                          "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
802                          Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
803
804   colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
805                    args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
806                    args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
807
808   TRACE_smpi_comm_out(get_pid());
809 }
810 } // Replay Namespace
811 }} // namespace simgrid::smpi
812
813 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
814 /** @brief Only initialize the replay, don't do it for real */
815 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
816 {
817   xbt_assert(not smpi_process()->initializing());
818
819   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
820   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
821   simgrid::smpi::ActorExt::init();
822
823   smpi_process()->mark_as_initialized();
824   smpi_process()->set_replaying(true);
825
826   TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
827   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
828   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
829   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
830   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
831   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
832   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
833   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
834   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
835   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
836   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
837   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
838   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
839   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
840   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
841   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
842   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
843   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
844   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
845   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
846   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
847   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
848   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
849   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
850   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
851   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
852   xbt_replay_action_register("scan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("scan").execute(action); });
853   xbt_replay_action_register("exscan", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScanAction("exscan").execute(action); });
854   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
855   xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
856   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
857
858   //if we have a delayed start, sleep here.
859   if (start_delay_flops > 0) {
860     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
861     private_execute_flops(start_delay_flops);
862   } else {
863     // Wait for the other actors to initialize also
864     simgrid::s4u::this_actor::yield();
865   }
866   if(_smpi_init_sleep > 0)
867     simgrid::s4u::this_actor::sleep_for(_smpi_init_sleep);
868 }
869
870 /** @brief actually run the replay after initialization */
871 void smpi_replay_main(int rank, const char* private_trace_filename)
872 {
873   static int active_processes = 0;
874   active_processes++;
875   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
876   std::string rank_string                      = std::to_string(rank);
877   simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
878
879   /* and now, finalize everything */
880   /* One active process will stop. Decrease the counter*/
881   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
882   XBT_DEBUG("There are %u elements in reqq[*]", count_requests);
883   if (count_requests > 0) {
884     std::vector<MPI_Request> requests(count_requests);
885     unsigned int i=0;
886
887     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
888       for (auto& req: pair.second){
889         requests[i] = req;
890       }
891       i++;
892     }
893     simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
894   }
895
896   if(simgrid::config::get_value<bool>("smpi/finalization-barrier"))
897     simgrid::smpi::colls::barrier(MPI_COMM_WORLD);
898
899   active_processes--;
900
901   if(active_processes==0){
902     /* Last process alive speaking: end the simulated timer */
903     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
904     smpi_free_replay_tmp_buffers();
905   }
906
907   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
908                      new simgrid::instr::NoOpTIData("finalize"));
909
910   smpi_process()->finalize();
911
912   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
913 }
914
915 /** @brief chain a replay initialization and a replay start */
916 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
917 {
918   smpi_replay_init(instance_id, rank, start_delay_flops);
919   smpi_replay_main(rank, private_trace_filename);
920 }