Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Add/remove headers
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
22 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
23 // this could go into a header file.
24 namespace hash_tuple {
25 template <typename TT> class hash {
26 public:
27   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
28 };
29
30 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
31 {
32   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
33 }
34
35 // Recursive template code derived from Matthieu M.
36 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
37 public:
38   static void apply(size_t& seed, Tuple const& tuple)
39   {
40     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
41     hash_combine(seed, std::get<Index>(tuple));
42   }
43 };
44
45 template <class Tuple> class HashValueImpl<Tuple, 0> {
46 public:
47   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
48 };
49
50 template <typename... TT> class hash<std::tuple<TT...>> {
51 public:
52   size_t operator()(std::tuple<TT...> const& tt) const
53   {
54     size_t seed = 0;
55     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
56     return seed;
57   }
58 };
59 }
60
61 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
62
63 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
64 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
65
66
67 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(std::string string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
138     }
139 };
140
141 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
142 {
143   CHECK_ACTION_PARAMS(action, 3, 0)
144   src = std::stoi(action[2]);
145   dst = std::stoi(action[3]);
146   tag = std::stoi(action[4]);
147 }
148
149 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
150 {
151   CHECK_ACTION_PARAMS(action, 3, 1)
152   partner = std::stoi(action[2]);
153   tag     = std::stoi(action[3]);
154   size    = parse_double(action[4]);
155   if (action.size() > 5)
156     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
157 }
158
159 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
160 {
161   CHECK_ACTION_PARAMS(action, 1, 0)
162   flops = parse_double(action[2]);
163 }
164
165 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
166 {
167   CHECK_ACTION_PARAMS(action, 1, 2)
168   size = parse_double(action[2]);
169   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
170   if (action.size() > 4)
171     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
172 }
173
174 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 2)
177   comm_size = parse_double(action[2]);
178   comp_size = parse_double(action[3]);
179   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
180   if (action.size() > 5)
181     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
182 }
183
184 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
185 {
186   CHECK_ACTION_PARAMS(action, 2, 1)
187   comm_size = parse_double(action[2]);
188   comp_size = parse_double(action[3]);
189   if (action.size() > 4)
190     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
191 }
192
193 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
194 {
195   CHECK_ACTION_PARAMS(action, 2, 1)
196   comm_size = MPI_COMM_WORLD->size();
197   send_size = parse_double(action[2]);
198   recv_size = parse_double(action[3]);
199
200   if (action.size() > 4)
201     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202   if (action.size() > 5)
203     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
204 }
205
206 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
207 {
208   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
209         0 gather 68 68 0 0 0
210       where:
211         1) 68 is the sendcounts
212         2) 68 is the recvcounts
213         3) 0 is the root node
214         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
216   */
217   CHECK_ACTION_PARAMS(action, 2, 3)
218   comm_size = MPI_COMM_WORLD->size();
219   send_size = parse_double(action[2]);
220   recv_size = parse_double(action[3]);
221
222   if (name == "gather") {
223     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
224     if (action.size() > 5)
225       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
226     if (action.size() > 6)
227       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
228   } else {
229     if (action.size() > 4)
230       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
231     if (action.size() > 5)
232       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
233   }
234 }
235
236 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
237 {
238   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
239        0 gather 68 68 10 10 10 0 0 0
240      where:
241        1) 68 is the sendcount
242        2) 68 10 10 10 is the recvcounts
243        3) 0 is the root node
244        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
245        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
246   */
247   comm_size = MPI_COMM_WORLD->size();
248   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
249   send_size  = parse_double(action[2]);
250   disps      = std::vector<int>(comm_size, 0);
251   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
252
253   if (name == "gatherV") {
254     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
255     if (action.size() > 4 + comm_size)
256       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
257     if (action.size() > 5 + comm_size)
258       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
259   } else {
260     int datatype_index = 0;
261     int disp_index     = 0;
262     /* The 3 comes from "0 gather <sendcount>", which must always be present.
263      * The + comm_size is the recvcounts array, which must also be present
264      */
265     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
266       datatype_index = 3 + comm_size;
267       disp_index     = datatype_index + 1;
268       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
269       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
270     } else if (action.size() >
271                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
272       disp_index = 3 + comm_size;
273     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
274       datatype_index = 3 + comm_size;
275       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
276       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
277     }
278
279     if (disp_index != 0) {
280       for (unsigned int i = 0; i < comm_size; i++)
281         disps[i]          = std::stoi(action[disp_index + i]);
282     }
283   }
284
285   for (unsigned int i = 0; i < comm_size; i++) {
286     (*recvcounts)[i] = std::stoi(action[i + 3]);
287   }
288   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
289 }
290
291 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
292 {
293   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
294         0 gather 68 68 0 0 0
295       where:
296         1) 68 is the sendcounts
297         2) 68 is the recvcounts
298         3) 0 is the root node
299         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
300         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
301   */
302   CHECK_ACTION_PARAMS(action, 2, 3)
303   comm_size = MPI_COMM_WORLD->size();
304   send_size = parse_double(action[2]);
305   recv_size = parse_double(action[3]);
306   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
307   if (action.size() > 5)
308     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
309   if (action.size() > 6)
310     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
311 }
312
313 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
314 {
315   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
316      0 gather 68 10 10 10 68 0 0 0
317       where:
318       1) 68 10 10 10 is the sendcounts
319       2) 68 is the recvcount
320       3) 0 is the root node
321       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
322       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
323   */
324   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
325   recv_size  = parse_double(action[2 + comm_size]);
326   disps      = std::vector<int>(comm_size, 0);
327   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
328
329   if (action.size() > 5 + comm_size)
330     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
331   if (action.size() > 5 + comm_size)
332     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
333
334   for (unsigned int i = 0; i < comm_size; i++) {
335     (*sendcounts)[i] = std::stoi(action[i + 2]);
336   }
337   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
338   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
339 }
340
341 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
342 {
343   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
344        0 reduceScatter 275427 275427 275427 204020 11346849 0
345      where:
346        1) The first four values after the name of the action declare the recvcounts array
347        2) The value 11346849 is the amount of instructions
348        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
349   */
350   comm_size = MPI_COMM_WORLD->size();
351   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
352   comp_size  = parse_double(action[2 + comm_size]);
353   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
354   if (action.size() > 3 + comm_size)
355     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
356
357   for (unsigned int i = 0; i < comm_size; i++) {
358     recvcounts->push_back(std::stoi(action[i + 2]));
359   }
360   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
361 }
362
363 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
364 {
365   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
366         0 allToAllV 100 1 7 10 12 100 1 70 10 5
367      where:
368       1) 100 is the size of the send buffer *sizeof(int),
369       2) 1 7 10 12 is the sendcounts array
370       3) 100*sizeof(int) is the size of the receiver buffer
371       4)  1 70 10 5 is the recvcounts array
372   */
373   comm_size = MPI_COMM_WORLD->size();
374   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
375   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
376   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
377   senddisps  = std::vector<int>(comm_size, 0);
378   recvdisps  = std::vector<int>(comm_size, 0);
379
380   if (action.size() > 5 + 2 * comm_size)
381     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
382   if (action.size() > 5 + 2 * comm_size)
383     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
384
385   send_buf_size = parse_double(action[2]);
386   recv_buf_size = parse_double(action[3 + comm_size]);
387   for (unsigned int i = 0; i < comm_size; i++) {
388     (*sendcounts)[i] = std::stoi(action[3 + i]);
389     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
390   }
391   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
392   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
393 }
394
395 template<class T>
396 void ReplayAction<T>::execute(simgrid::xbt::ReplayAction& action)
397 {
398   // Needs to be re-initialized for every action, hence here
399   double start_time = smpi_process()->simulated_elapsed();
400   args.parse(action, name);
401   kernel(action);
402   if (name != "Init")
403     log_timed_action(action, start_time);
404 }
405
406 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
407 {
408   std::string s = boost::algorithm::join(action, " ");
409   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
410   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
411   req_storage.remove(request);
412
413   if (request == MPI_REQUEST_NULL) {
414     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
415      * return.*/
416     return;
417   }
418
419   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
420
421   // Must be taken before Request::wait() since the request may be set to
422   // MPI_REQUEST_NULL by Request::wait!
423   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
424   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
425   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
426
427   MPI_Status status;
428   Request::wait(&request, &status);
429
430   TRACE_smpi_comm_out(rank);
431   if (is_wait_for_receive)
432     TRACE_smpi_recv(args.src, args.dst, args.tag);
433   }
434
435   void SendAction::kernel(simgrid::xbt::ReplayAction& action)
436   {
437     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
438
439     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
440                                                                              args.tag, Datatype::encode(args.datatype1)));
441     if (not TRACE_smpi_view_internals())
442       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
443
444     if (name == "send") {
445       Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
446     } else if (name == "Isend") {
447       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
448       req_storage.add(request);
449     } else {
450       xbt_die("Don't know this action, %s", name.c_str());
451     }
452
453     TRACE_smpi_comm_out(my_proc_id);
454   }
455
456   void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
457   {
458     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
459
460     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
461                                                                              args.tag, Datatype::encode(args.datatype1)));
462
463     MPI_Status status;
464     // unknown size from the receiver point of view
465     if (args.size <= 0.0) {
466       Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467       args.size = status.count;
468     }
469
470     if (name == "recv") {
471       Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
472     } else if (name == "Irecv") {
473       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
474       req_storage.add(request);
475     }
476
477     TRACE_smpi_comm_out(my_proc_id);
478     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
479     if (name == "recv" && not TRACE_smpi_view_internals()) {
480       TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
481     }
482   }
483
484   void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
485   {
486     TRACE_smpi_computing_in(my_proc_id, args.flops);
487     smpi_execute_flops(args.flops);
488     TRACE_smpi_computing_out(my_proc_id);
489   }
490
491   void TestAction::kernel(simgrid::xbt::ReplayAction& action)
492   {
493     MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
494     req_storage.remove(request);
495     // if request is null here, this may mean that a previous test has succeeded
496     // Different times in traced application and replayed version may lead to this
497     // In this case, ignore the extra calls.
498     if (request != MPI_REQUEST_NULL) {
499       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
500
501       MPI_Status status;
502       int flag = Request::test(&request, &status);
503
504       XBT_DEBUG("MPI_Test result: %d", flag);
505       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
506        * nullptr.*/
507       if (request == MPI_REQUEST_NULL)
508         req_storage.addNullRequest(args.src, args.dst, args.tag);
509       else
510         req_storage.add(request);
511
512       TRACE_smpi_comm_out(my_proc_id);
513     }
514   }
515
516   void InitAction::kernel(simgrid::xbt::ReplayAction& action)
517   {
518     CHECK_ACTION_PARAMS(action, 0, 1)
519     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
520                                            : MPI_BYTE;  // default TAU datatype
521
522     /* start a simulated timer */
523     smpi_process()->simulated_start();
524   }
525
526   void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
527   {
528     /* nothing to do */
529   }
530
531   void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
532   {
533     const unsigned int count_requests = req_storage.size();
534
535     if (count_requests > 0) {
536       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
537       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
538       std::vector<MPI_Request> reqs;
539       req_storage.get_requests(reqs);
540       for (const auto& req : reqs) {
541         if (req && (req->flags() & MPI_REQ_RECV)) {
542           sender_receiver.push_back({req->src(), req->dst()});
543         }
544       }
545       MPI_Status status[count_requests];
546       Request::waitall(count_requests, &(reqs.data())[0], status);
547       req_storage.get_store().clear();
548
549       for (auto& pair : sender_receiver) {
550         TRACE_smpi_recv(pair.first, pair.second, 0);
551       }
552       TRACE_smpi_comm_out(my_proc_id);
553     }
554   }
555
556   void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
557   {
558     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
559     Colls::barrier(MPI_COMM_WORLD);
560     TRACE_smpi_comm_out(my_proc_id);
561   }
562
563   void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
564   {
565     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
566                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
567                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
568
569     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
570
571     TRACE_smpi_comm_out(my_proc_id);
572   }
573
574   void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
575   {
576     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
577                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
578                                                       args.comp_size, args.comm_size, -1,
579                                                       Datatype::encode(args.datatype1), ""));
580
581     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
582         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
583     smpi_execute_flops(args.comp_size);
584
585     TRACE_smpi_comm_out(my_proc_id);
586   }
587
588   void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
589   {
590     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
591                                                                                 Datatype::encode(args.datatype1), ""));
592
593     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
594         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
595     smpi_execute_flops(args.comp_size);
596
597     TRACE_smpi_comm_out(my_proc_id);
598   }
599
600   void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
601   {
602     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
603                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
604                                                     Datatype::encode(args.datatype1),
605                                                     Datatype::encode(args.datatype2)));
606
607     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
608                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
609                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
610
611     TRACE_smpi_comm_out(my_proc_id);
612   }
613
614   void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
615   {
616     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
617                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
618
619     if (name == "gather") {
620       int rank = MPI_COMM_WORLD->rank();
621       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
622                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
623     }
624     else
625       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
626                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
627
628     TRACE_smpi_comm_out(my_proc_id);
629   }
630
631   void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
632   {
633     int rank = MPI_COMM_WORLD->rank();
634
635     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
636                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
637                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
638
639     if (name == "gatherV") {
640       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
641                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
642                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
643     }
644     else {
645       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
646                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
647                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
648     }
649
650     TRACE_smpi_comm_out(my_proc_id);
651   }
652
653   void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
654   {
655     int rank = MPI_COMM_WORLD->rank();
656     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
657                                                                           Datatype::encode(args.datatype1),
658                                                                           Datatype::encode(args.datatype2)));
659
660     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
661                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
662
663     TRACE_smpi_comm_out(my_proc_id);
664   }
665
666   void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
667   {
668     int rank = MPI_COMM_WORLD->rank();
669     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
670           nullptr, Datatype::encode(args.datatype1),
671           Datatype::encode(args.datatype2)));
672
673     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
674                     args.sendcounts->data(), args.disps.data(), args.datatype1,
675                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
676                     MPI_COMM_WORLD);
677
678     TRACE_smpi_comm_out(my_proc_id);
679   }
680
681   void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
682   {
683     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
684                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
685                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
686                                                          Datatype::encode(args.datatype1)));
687
688     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
689                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
690                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
691
692     smpi_execute_flops(args.comp_size);
693     TRACE_smpi_comm_out(my_proc_id);
694   }
695
696   void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
697   {
698     TRACE_smpi_comm_in(my_proc_id, __func__,
699                        new simgrid::instr::VarCollTIData(
700                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
701                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
702
703     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
704                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
705
706     TRACE_smpi_comm_out(my_proc_id);
707   }
708 } // Replay Namespace
709 }} // namespace simgrid::smpi
710
711 std::vector<simgrid::smpi::replay::RequestStorage> storage;
712 /** @brief Only initialize the replay, don't do it for real */
713 void smpi_replay_init(int* argc, char*** argv)
714 {
715   simgrid::smpi::Process::init(argc, argv);
716   smpi_process()->mark_as_initialized();
717   smpi_process()->set_replaying(true);
718
719   int my_proc_id = simgrid::s4u::this_actor::get_pid();
720   storage.resize(smpi_process_count());
721
722   TRACE_smpi_init(my_proc_id);
723   TRACE_smpi_computing_init(my_proc_id);
724   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
725   TRACE_smpi_comm_out(my_proc_id);
726   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
727   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
728   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
729   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
730   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
731   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
732   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
733   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
734   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
735   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
736   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
737   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
738   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
739   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
740   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
741   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
742   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
743   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
744   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
745   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
746   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
747   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
748   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
749   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
750   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
751   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
752
753   //if we have a delayed start, sleep here.
754   if(*argc>2){
755     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
756     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
757     smpi_execute_flops(value);
758   } else {
759     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
760     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
761     smpi_execute_flops(0.0);
762   }
763 }
764
765 /** @brief actually run the replay after initialization */
766 void smpi_replay_main(int* argc, char*** argv)
767 {
768   static int active_processes = 0;
769   active_processes++;
770   simgrid::xbt::replay_runner(*argc, *argv);
771
772   /* and now, finalize everything */
773   /* One active process will stop. Decrease the counter*/
774   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
775   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
776   if (count_requests > 0) {
777     MPI_Request requests[count_requests];
778     MPI_Status status[count_requests];
779     unsigned int i=0;
780
781     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
782       requests[i] = pair.second;
783       i++;
784     }
785     simgrid::smpi::Request::waitall(count_requests, requests, status);
786   }
787   active_processes--;
788
789   if(active_processes==0){
790     /* Last process alive speaking: end the simulated timer */
791     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
792     smpi_free_replay_tmp_buffers();
793   }
794
795   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
796                      new simgrid::instr::NoOpTIData("finalize"));
797
798   smpi_process()->finalize();
799
800   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
801   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
802 }
803
804 /** @brief chain a replay initialization and a replay start */
805 void smpi_replay_run(int* argc, char*** argv)
806 {
807   smpi_replay_init(argc, argv);
808   smpi_replay_main(argc, argv);
809 }