Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Move ReplayAction definitions to replay.hpp
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14 #include <simgrid/smpi/replay.hpp>
15
16 #include <boost/algorithm/string/join.hpp>
17 #include <memory>
18 #include <numeric>
19 #include <unordered_map>
20 #include <vector>
21
22 #include <tuple>
23 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
24 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
25 // this could go into a header file.
26 namespace hash_tuple {
27 template <typename TT> class hash {
28 public:
29   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
30 };
31
32 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
33 {
34   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
35 }
36
37 // Recursive template code derived from Matthieu M.
38 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
39 public:
40   static void apply(size_t& seed, Tuple const& tuple)
41   {
42     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
43     hash_combine(seed, std::get<Index>(tuple));
44   }
45 };
46
47 template <class Tuple> class HashValueImpl<Tuple, 0> {
48 public:
49   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
50 };
51
52 template <typename... TT> class hash<std::tuple<TT...>> {
53 public:
54   size_t operator()(std::tuple<TT...> const& tt) const
55   {
56     size_t seed = 0;
57     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
58     return seed;
59   }
60 };
61 }
62
63 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
64
65 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
66 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67
68
69 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
70 {
71   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
72     std::string s = boost::algorithm::join(action, " ");
73     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
74   }
75 }
76
77 /* Helper function */
78 static double parse_double(std::string string)
79 {
80   return xbt_str_parse_double(string.c_str(), "%s is not a double");
81 }
82
83 namespace simgrid {
84 namespace smpi {
85
86 namespace replay {
87 MPI_Datatype MPI_DEFAULT_TYPE;
88
89 class RequestStorage {
90 private:
91     req_storage_t store;
92
93 public:
94     RequestStorage() {}
95     int size()
96     {
97       return store.size();
98     }
99
100     req_storage_t& get_store()
101     {
102       return store;
103     }
104
105     void get_requests(std::vector<MPI_Request>& vec)
106     {
107       for (auto& pair : store) {
108         auto& req = pair.second;
109         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
110         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
111           vec.push_back(pair.second);
112           pair.second->print_request("MM");
113         }
114       }
115     }
116
117     MPI_Request find(int src, int dst, int tag)
118     {
119       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
120       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121     }
122
123     void remove(MPI_Request req)
124     {
125       if (req == MPI_REQUEST_NULL) return;
126
127       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128     }
129
130     void add(MPI_Request req)
131     {
132       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
133         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134     }
135
136     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
137     void addNullRequest(int src, int dst, int tag)
138     {
139       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
140     }
141 };
142
143 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
144 {
145   CHECK_ACTION_PARAMS(action, 3, 0)
146   src = std::stoi(action[2]);
147   dst = std::stoi(action[3]);
148   tag = std::stoi(action[4]);
149 }
150
151 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
152 {
153   CHECK_ACTION_PARAMS(action, 3, 1)
154   partner = std::stoi(action[2]);
155   tag     = std::stoi(action[3]);
156   size    = parse_double(action[4]);
157   if (action.size() > 5)
158     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
159 }
160
161 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
162 {
163   CHECK_ACTION_PARAMS(action, 1, 0)
164   flops = parse_double(action[2]);
165 }
166
167 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
168 {
169   CHECK_ACTION_PARAMS(action, 1, 2)
170   size = parse_double(action[2]);
171   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
172   if (action.size() > 4)
173     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
174 }
175
176 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
177 {
178   CHECK_ACTION_PARAMS(action, 2, 2)
179   comm_size = parse_double(action[2]);
180   comp_size = parse_double(action[3]);
181   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
182   if (action.size() > 5)
183     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
184 }
185
186 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
187 {
188   CHECK_ACTION_PARAMS(action, 2, 1)
189   comm_size = parse_double(action[2]);
190   comp_size = parse_double(action[3]);
191   if (action.size() > 4)
192     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
193 }
194
195 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
196 {
197   CHECK_ACTION_PARAMS(action, 2, 1)
198   comm_size = MPI_COMM_WORLD->size();
199   send_size = parse_double(action[2]);
200   recv_size = parse_double(action[3]);
201
202   if (action.size() > 4)
203     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
204   if (action.size() > 5)
205     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
206 }
207
208 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
209 {
210   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
211         0 gather 68 68 0 0 0
212       where:
213         1) 68 is the sendcounts
214         2) 68 is the recvcounts
215         3) 0 is the root node
216         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
217         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
218   */
219   CHECK_ACTION_PARAMS(action, 2, 3)
220   comm_size = MPI_COMM_WORLD->size();
221   send_size = parse_double(action[2]);
222   recv_size = parse_double(action[3]);
223
224   if (name == "gather") {
225     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
226     if (action.size() > 5)
227       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
228     if (action.size() > 6)
229       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
230   } else {
231     if (action.size() > 4)
232       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
233     if (action.size() > 5)
234       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
235   }
236 }
237
238 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
239 {
240   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
241        0 gather 68 68 10 10 10 0 0 0
242      where:
243        1) 68 is the sendcount
244        2) 68 10 10 10 is the recvcounts
245        3) 0 is the root node
246        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
247        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
248   */
249   comm_size = MPI_COMM_WORLD->size();
250   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
251   send_size  = parse_double(action[2]);
252   disps      = std::vector<int>(comm_size, 0);
253   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
254
255   if (name == "gatherV") {
256     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
257     if (action.size() > 4 + comm_size)
258       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
259     if (action.size() > 5 + comm_size)
260       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
261   } else {
262     int datatype_index = 0;
263     int disp_index     = 0;
264     /* The 3 comes from "0 gather <sendcount>", which must always be present.
265      * The + comm_size is the recvcounts array, which must also be present
266      */
267     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
268       datatype_index = 3 + comm_size;
269       disp_index     = datatype_index + 1;
270       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
271       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
272     } else if (action.size() >
273                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
274       disp_index = 3 + comm_size;
275     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
276       datatype_index = 3 + comm_size;
277       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
278       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279     }
280
281     if (disp_index != 0) {
282       for (unsigned int i = 0; i < comm_size; i++)
283         disps[i]          = std::stoi(action[disp_index + i]);
284     }
285   }
286
287   for (unsigned int i = 0; i < comm_size; i++) {
288     (*recvcounts)[i] = std::stoi(action[i + 3]);
289   }
290   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
291 }
292
293 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
294 {
295   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
296         0 gather 68 68 0 0 0
297       where:
298         1) 68 is the sendcounts
299         2) 68 is the recvcounts
300         3) 0 is the root node
301         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
302         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
303   */
304   CHECK_ACTION_PARAMS(action, 2, 3)
305   comm_size = MPI_COMM_WORLD->size();
306   send_size = parse_double(action[2]);
307   recv_size = parse_double(action[3]);
308   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
309   if (action.size() > 5)
310     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
311   if (action.size() > 6)
312     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
313 }
314
315 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
316 {
317   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
318      0 gather 68 10 10 10 68 0 0 0
319       where:
320       1) 68 10 10 10 is the sendcounts
321       2) 68 is the recvcount
322       3) 0 is the root node
323       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
324       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
325   */
326   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
327   recv_size  = parse_double(action[2 + comm_size]);
328   disps      = std::vector<int>(comm_size, 0);
329   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
330
331   if (action.size() > 5 + comm_size)
332     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
333   if (action.size() > 5 + comm_size)
334     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
335
336   for (unsigned int i = 0; i < comm_size; i++) {
337     (*sendcounts)[i] = std::stoi(action[i + 2]);
338   }
339   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
340   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
341 }
342
343 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
344 {
345   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
346        0 reduceScatter 275427 275427 275427 204020 11346849 0
347      where:
348        1) The first four values after the name of the action declare the recvcounts array
349        2) The value 11346849 is the amount of instructions
350        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
351   */
352   comm_size = MPI_COMM_WORLD->size();
353   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
354   comp_size  = parse_double(action[2 + comm_size]);
355   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
356   if (action.size() > 3 + comm_size)
357     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
358
359   for (unsigned int i = 0; i < comm_size; i++) {
360     recvcounts->push_back(std::stoi(action[i + 2]));
361   }
362   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
363 }
364
365 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
366 {
367   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
368         0 allToAllV 100 1 7 10 12 100 1 70 10 5
369      where:
370       1) 100 is the size of the send buffer *sizeof(int),
371       2) 1 7 10 12 is the sendcounts array
372       3) 100*sizeof(int) is the size of the receiver buffer
373       4)  1 70 10 5 is the recvcounts array
374   */
375   comm_size = MPI_COMM_WORLD->size();
376   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
377   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
378   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379   senddisps  = std::vector<int>(comm_size, 0);
380   recvdisps  = std::vector<int>(comm_size, 0);
381
382   if (action.size() > 5 + 2 * comm_size)
383     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
384   if (action.size() > 5 + 2 * comm_size)
385     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
386
387   send_buf_size = parse_double(action[2]);
388   recv_buf_size = parse_double(action[3 + comm_size]);
389   for (unsigned int i = 0; i < comm_size; i++) {
390     (*sendcounts)[i] = std::stoi(action[3 + i]);
391     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
392   }
393   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
394   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
395 }
396
397 template<class T>
398 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
399 {
400   // Needs to be re-initialized for every action, hence here
401   double start_time = smpi_process()->simulated_elapsed();
402   args.parse(action, name);
403   kernel(action);
404   if (name != "Init")
405     log_timed_action(action, start_time);
406 }
407
408   void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
409   {
410     std::string s = boost::algorithm::join(action, " ");
411     xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
412     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
413     req_storage.remove(request);
414
415     if (request == MPI_REQUEST_NULL) {
416       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
417        * return.*/
418       return;
419     }
420
421     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
422
423     // Must be taken before Request::wait() since the request may be set to
424     // MPI_REQUEST_NULL by Request::wait!
425     bool is_wait_for_receive = (request->flags() & RECV);
426     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
427     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
428
429     MPI_Status status;
430     Request::wait(&request, &status);
431
432     TRACE_smpi_comm_out(rank);
433     if (is_wait_for_receive)
434       TRACE_smpi_recv(args.src, args.dst, args.tag);
435   }
436
437   void SendAction::kernel(simgrid::xbt::ReplayAction& action)
438   {
439     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
440
441     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
442                                                                              args.tag, Datatype::encode(args.datatype1)));
443     if (not TRACE_smpi_view_internals())
444       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
445
446     if (name == "send") {
447       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448     } else if (name == "Isend") {
449       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
450       req_storage.add(request);
451     } else {
452       xbt_die("Don't know this action, %s", name.c_str());
453     }
454
455     TRACE_smpi_comm_out(my_proc_id);
456   }
457
458   void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
459   {
460     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
461
462     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
463                                                                              args.tag, Datatype::encode(args.datatype1)));
464
465     MPI_Status status;
466     // unknown size from the receiver point of view
467     if (args.size <= 0.0) {
468       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
469       args.size = status.count;
470     }
471
472     if (name == "recv") {
473       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474     } else if (name == "Irecv") {
475       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476       req_storage.add(request);
477     }
478
479     TRACE_smpi_comm_out(my_proc_id);
480     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
481     if (name == "recv" && not TRACE_smpi_view_internals()) {
482       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
483     }
484   }
485
486   void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
487   {
488     TRACE_smpi_computing_in(my_proc_id, args.flops);
489     smpi_execute_flops(args.flops);
490     TRACE_smpi_computing_out(my_proc_id);
491   }
492
493   void TestAction::kernel(simgrid::xbt::ReplayAction& action)
494   {
495     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
496     req_storage.remove(request);
497     // if request is null here, this may mean that a previous test has succeeded
498     // Different times in traced application and replayed version may lead to this
499     // In this case, ignore the extra calls.
500     if (request != MPI_REQUEST_NULL) {
501       TRACE_smpi_testing_in(my_proc_id);
502
503       MPI_Status status;
504       int flag = Request::test(&request, &status);
505
506       XBT_DEBUG("MPI_Test result: %d", flag);
507       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
508        * nullptr.*/
509       if (request == MPI_REQUEST_NULL)
510         req_storage.addNullRequest(args.src, args.dst, args.tag);
511       else
512         req_storage.add(request);
513
514       TRACE_smpi_testing_out(my_proc_id);
515     }
516   }
517
518   void InitAction::kernel(simgrid::xbt::ReplayAction& action)
519   {
520     CHECK_ACTION_PARAMS(action, 0, 1)
521     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
522                                            : MPI_BYTE;  // default TAU datatype
523
524     /* start a simulated timer */
525     smpi_process()->simulated_start();
526   }
527
528   void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
529   { 
530     /* nothing to do */
531   }
532
533   void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
534   {
535     const unsigned int count_requests = req_storage.size();
536
537     if (count_requests > 0) {
538       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
539       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
540       std::vector<MPI_Request> reqs;
541       req_storage.get_requests(reqs);
542       for (const auto& req : reqs) {
543         if (req && (req->flags() & RECV)) {
544           sender_receiver.push_back({req->src(), req->dst()});
545         }
546       }
547       MPI_Status status[count_requests];
548       Request::waitall(count_requests, &(reqs.data())[0], status);
549       req_storage.get_store().clear();
550
551       for (auto& pair : sender_receiver) {
552         TRACE_smpi_recv(pair.first, pair.second, 0);
553       }
554       TRACE_smpi_comm_out(my_proc_id);
555     }
556   }
557
558   void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
559   {
560     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
561     Colls::barrier(MPI_COMM_WORLD);
562     TRACE_smpi_comm_out(my_proc_id);
563   }
564
565   void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
566   {
567     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
568                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
569                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
570
571     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
572
573     TRACE_smpi_comm_out(my_proc_id);
574   }
575
576   void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
577   {
578     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
579                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
580                                                       args.comp_size, args.comm_size, -1,
581                                                       Datatype::encode(args.datatype1), ""));
582
583     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
584         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
585     smpi_execute_flops(args.comp_size);
586
587     TRACE_smpi_comm_out(my_proc_id);
588   }
589
590   void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
591   {
592     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
593                                                                                 Datatype::encode(args.datatype1), ""));
594
595     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
596         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
597     smpi_execute_flops(args.comp_size);
598
599     TRACE_smpi_comm_out(my_proc_id);
600   }
601
602   void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
603   {
604     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
605                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
606                                                     Datatype::encode(args.datatype1),
607                                                     Datatype::encode(args.datatype2)));
608
609     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
610                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
611                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
612
613     TRACE_smpi_comm_out(my_proc_id);
614   }
615
616   void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
617   {
618     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
619                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
620
621     if (name == "gather") {
622       int rank = MPI_COMM_WORLD->rank();
623       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
624                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
625     }
626     else
627       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
628                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
629
630     TRACE_smpi_comm_out(my_proc_id);
631   }
632
633   void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
634   {
635     int rank = MPI_COMM_WORLD->rank();
636
637     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
638                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
639                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
640
641     if (name == "gatherV") {
642       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
643                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
644                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
645     }
646     else {
647       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
648                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
649                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
650     }
651
652     TRACE_smpi_comm_out(my_proc_id);
653   }
654
655   void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
656   {
657     int rank = MPI_COMM_WORLD->rank();
658     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
659                                                                           Datatype::encode(args.datatype1),
660                                                                           Datatype::encode(args.datatype2)));
661
662     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
663                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
664
665     TRACE_smpi_comm_out(my_proc_id);
666   }
667
668   void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
669   {
670     int rank = MPI_COMM_WORLD->rank();
671     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
672           nullptr, Datatype::encode(args.datatype1),
673           Datatype::encode(args.datatype2)));
674
675     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
676                     args.sendcounts->data(), args.disps.data(), args.datatype1,
677                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
678                     MPI_COMM_WORLD);
679
680     TRACE_smpi_comm_out(my_proc_id);
681   }
682
683   void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
684   {
685     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
686                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
687                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
688                                                          Datatype::encode(args.datatype1)));
689
690     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
691                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
692                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
693
694     smpi_execute_flops(args.comp_size);
695     TRACE_smpi_comm_out(my_proc_id);
696   }
697
698   void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
699   {
700     TRACE_smpi_comm_in(my_proc_id, __func__,
701                        new simgrid::instr::VarCollTIData(
702                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
703                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
704
705     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
706                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
707
708     TRACE_smpi_comm_out(my_proc_id);
709   }
710 } // Replay Namespace
711 }} // namespace simgrid::smpi
712
713 std::vector<simgrid::smpi::replay::RequestStorage> storage;
714 /** @brief Only initialize the replay, don't do it for real */
715 void smpi_replay_init(int* argc, char*** argv)
716 {
717   simgrid::smpi::Process::init(argc, argv);
718   smpi_process()->mark_as_initialized();
719   smpi_process()->set_replaying(true);
720
721   int my_proc_id = simgrid::s4u::this_actor::get_pid();
722   storage.resize(smpi_process_count());
723
724   TRACE_smpi_init(my_proc_id);
725   TRACE_smpi_computing_init(my_proc_id);
726   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
727   TRACE_smpi_comm_out(my_proc_id);
728   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
729   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
730   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
732   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
733   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
739   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
740   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
741   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
742   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
743   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
744   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
745   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
746   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
747   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
748   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
749   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
750   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
751   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
752   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
753   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
754
755   //if we have a delayed start, sleep here.
756   if(*argc>2){
757     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
758     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
759     smpi_execute_flops(value);
760   } else {
761     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
762     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
763     smpi_execute_flops(0.0);
764   }
765 }
766
767 /** @brief actually run the replay after initialization */
768 void smpi_replay_main(int* argc, char*** argv)
769 {
770   static int active_processes = 0;
771   active_processes++;
772   simgrid::xbt::replay_runner(*argc, *argv);
773
774   /* and now, finalize everything */
775   /* One active process will stop. Decrease the counter*/
776   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
777   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
778   if (count_requests > 0) {
779     MPI_Request requests[count_requests];
780     MPI_Status status[count_requests];
781     unsigned int i=0;
782
783     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
784       requests[i] = pair.second;
785       i++;
786     }
787     simgrid::smpi::Request::waitall(count_requests, requests, status);
788   }
789   active_processes--;
790
791   if(active_processes==0){
792     /* Last process alive speaking: end the simulated timer */
793     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
794     smpi_free_replay_tmp_buffers();
795   }
796
797   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
798                      new simgrid::instr::NoOpTIData("finalize"));
799
800   smpi_process()->finalize();
801
802   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
803   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
804 }
805
806 /** @brief chain a replay initialization and a replay start */
807 void smpi_replay_run(int* argc, char*** argv)
808 {
809   smpi_replay_init(argc, argv);
810   smpi_replay_main(argc, argv);
811 }