Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(std::string string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 2)
171   size = parse_double(action[2]);
172   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173   if (action.size() > 4)
174     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
175 }
176
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
178 {
179   CHECK_ACTION_PARAMS(action, 2, 2)
180   comm_size = parse_double(action[2]);
181   comp_size = parse_double(action[3]);
182   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
183   if (action.size() > 5)
184     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
185 }
186
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 1)
190   comm_size = parse_double(action[2]);
191   comp_size = parse_double(action[3]);
192   if (action.size() > 4)
193     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
194 }
195
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
197 {
198   CHECK_ACTION_PARAMS(action, 2, 1)
199   comm_size = MPI_COMM_WORLD->size();
200   send_size = parse_double(action[2]);
201   recv_size = parse_double(action[3]);
202
203   if (action.size() > 4)
204     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205   if (action.size() > 5)
206     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
207 }
208
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
210 {
211   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
212         0 gather 68 68 0 0 0
213       where:
214         1) 68 is the sendcounts
215         2) 68 is the recvcounts
216         3) 0 is the root node
217         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219   */
220   CHECK_ACTION_PARAMS(action, 2, 3)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_double(action[2]);
223   recv_size = parse_double(action[3]);
224
225   if (name == "gather") {
226     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
227     if (action.size() > 5)
228       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229     if (action.size() > 6)
230       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231   } else {
232     if (action.size() > 4)
233       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234     if (action.size() > 5)
235       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
236   }
237 }
238
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
240 {
241   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242        0 gather 68 68 10 10 10 0 0 0
243      where:
244        1) 68 is the sendcount
245        2) 68 10 10 10 is the recvcounts
246        3) 0 is the root node
247        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249   */
250   comm_size = MPI_COMM_WORLD->size();
251   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252   send_size  = parse_double(action[2]);
253   disps      = std::vector<int>(comm_size, 0);
254   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
255
256   if (name == "gatherv") {
257     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258     if (action.size() > 4 + comm_size)
259       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260     if (action.size() > 5 + comm_size)
261       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
262   } else {
263     int datatype_index = 0;
264     int disp_index     = 0;
265     /* The 3 comes from "0 gather <sendcount>", which must always be present.
266      * The + comm_size is the recvcounts array, which must also be present
267      */
268     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269       datatype_index = 3 + comm_size;
270       disp_index     = datatype_index + 1;
271       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
272       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
273     } else if (action.size() >
274                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275       disp_index = 3 + comm_size;
276     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277       datatype_index = 3 + comm_size;
278       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
280     }
281
282     if (disp_index != 0) {
283       for (unsigned int i = 0; i < comm_size; i++)
284         disps[i]          = std::stoi(action[disp_index + i]);
285     }
286   }
287
288   for (unsigned int i = 0; i < comm_size; i++) {
289     (*recvcounts)[i] = std::stoi(action[i + 3]);
290   }
291   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
292 }
293
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
295 {
296   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
297         0 gather 68 68 0 0 0
298       where:
299         1) 68 is the sendcounts
300         2) 68 is the recvcounts
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304   */
305   CHECK_ACTION_PARAMS(action, 2, 3)
306   comm_size = MPI_COMM_WORLD->size();
307   send_size = parse_double(action[2]);
308   recv_size = parse_double(action[3]);
309   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
310   if (action.size() > 5)
311     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312   if (action.size() > 6)
313     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
314 }
315
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
317 {
318   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319      0 gather 68 10 10 10 68 0 0 0
320       where:
321       1) 68 10 10 10 is the sendcounts
322       2) 68 is the recvcount
323       3) 0 is the root node
324       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326   */
327   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328   recv_size  = parse_double(action[2 + comm_size]);
329   disps      = std::vector<int>(comm_size, 0);
330   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
331
332   if (action.size() > 5 + comm_size)
333     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334   if (action.size() > 5 + comm_size)
335     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
336
337   for (unsigned int i = 0; i < comm_size; i++) {
338     (*sendcounts)[i] = std::stoi(action[i + 2]);
339   }
340   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
342 }
343
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
345 {
346   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347        0 reducescatter 275427 275427 275427 204020 11346849 0
348      where:
349        1) The first four values after the name of the action declare the recvcounts array
350        2) The value 11346849 is the amount of instructions
351        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
352   */
353   comm_size = MPI_COMM_WORLD->size();
354   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355   comp_size  = parse_double(action[2 + comm_size]);
356   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357   if (action.size() > 3 + comm_size)
358     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
359
360   for (unsigned int i = 0; i < comm_size; i++) {
361     recvcounts->push_back(std::stoi(action[i + 2]));
362   }
363   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
364 }
365
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
367 {
368   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369         0 alltoallv 100 1 7 10 12 100 1 70 10 5
370      where:
371       1) 100 is the size of the send buffer *sizeof(int),
372       2) 1 7 10 12 is the sendcounts array
373       3) 100*sizeof(int) is the size of the receiver buffer
374       4)  1 70 10 5 is the recvcounts array
375   */
376   comm_size = MPI_COMM_WORLD->size();
377   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380   senddisps  = std::vector<int>(comm_size, 0);
381   recvdisps  = std::vector<int>(comm_size, 0);
382
383   if (action.size() > 5 + 2 * comm_size)
384     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385   if (action.size() > 5 + 2 * comm_size)
386     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
387
388   send_buf_size = parse_double(action[2]);
389   recv_buf_size = parse_double(action[3 + comm_size]);
390   for (unsigned int i = 0; i < comm_size; i++) {
391     (*sendcounts)[i] = std::stoi(action[3 + i]);
392     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
393   }
394   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
396 }
397
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
399 {
400   std::string s = boost::algorithm::join(action, " ");
401   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403   req_storage.remove(request);
404
405   if (request == MPI_REQUEST_NULL) {
406     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
407      * return.*/
408     return;
409   }
410
411   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
412
413   // Must be taken before Request::wait() since the request may be set to
414   // MPI_REQUEST_NULL by Request::wait!
415   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
418
419   MPI_Status status;
420   Request::wait(&request, &status);
421
422   TRACE_smpi_comm_out(rank);
423   if (is_wait_for_receive)
424     TRACE_smpi_recv(args.src, args.dst, args.tag);
425 }
426
427 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
428 {
429   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
430
431   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432         args.tag, Datatype::encode(args.datatype1)));
433   if (not TRACE_smpi_view_internals())
434     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
435
436   if (name == "send") {
437     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438   } else if (name == "isend") {
439     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440     req_storage.add(request);
441   } else {
442     xbt_die("Don't know this action, %s", name.c_str());
443   }
444
445   TRACE_smpi_comm_out(my_proc_id);
446 }
447
448 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
449 {
450   int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
451
452   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
453         args.tag, Datatype::encode(args.datatype1)));
454
455   MPI_Status status;
456   // unknown size from the receiver point of view
457   if (args.size <= 0.0) {
458     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
459     args.size = status.count;
460   }
461
462   if (name == "recv") {
463     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464   } else if (name == "irecv") {
465     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466     req_storage.add(request);
467   }
468
469   TRACE_smpi_comm_out(my_proc_id);
470   // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
471   if (name == "recv" && not TRACE_smpi_view_internals()) {
472     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
473   }
474 }
475
476 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
477 {
478   TRACE_smpi_computing_in(my_proc_id, args.flops);
479   smpi_execute_flops(args.flops);
480   TRACE_smpi_computing_out(my_proc_id);
481 }
482
483 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
484 {
485   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
486   req_storage.remove(request);
487   // if request is null here, this may mean that a previous test has succeeded
488   // Different times in traced application and replayed version may lead to this
489   // In this case, ignore the extra calls.
490   if (request != MPI_REQUEST_NULL) {
491     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
492
493     MPI_Status status;
494     int flag = Request::test(&request, &status);
495
496     XBT_DEBUG("MPI_Test result: %d", flag);
497     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
498      * nullptr.*/
499     if (request == MPI_REQUEST_NULL)
500       req_storage.addNullRequest(args.src, args.dst, args.tag);
501     else
502       req_storage.add(request);
503
504     TRACE_smpi_comm_out(my_proc_id);
505   }
506 }
507
508 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
509 {
510   CHECK_ACTION_PARAMS(action, 0, 1)
511     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
512     : MPI_BYTE;  // default TAU datatype
513
514   /* start a simulated timer */
515   smpi_process()->simulated_start();
516 }
517
518 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
519 {
520   /* nothing to do */
521 }
522
523 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
524 {
525   const unsigned int count_requests = req_storage.size();
526
527   if (count_requests > 0) {
528     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
529     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
530     std::vector<MPI_Request> reqs;
531     req_storage.get_requests(reqs);
532     for (const auto& req : reqs) {
533       if (req && (req->flags() & MPI_REQ_RECV)) {
534         sender_receiver.push_back({req->src(), req->dst()});
535       }
536     }
537     MPI_Status status[count_requests];
538     Request::waitall(count_requests, &(reqs.data())[0], status);
539     req_storage.get_store().clear();
540
541     for (auto& pair : sender_receiver) {
542       TRACE_smpi_recv(pair.first, pair.second, 0);
543     }
544     TRACE_smpi_comm_out(my_proc_id);
545   }
546 }
547
548 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
549 {
550   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
551   Colls::barrier(MPI_COMM_WORLD);
552   TRACE_smpi_comm_out(my_proc_id);
553 }
554
555 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
556 {
557   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
558       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
559         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
560
561   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
562
563   TRACE_smpi_comm_out(my_proc_id);
564 }
565
566 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
567 {
568   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
569       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
570         args.comp_size, args.comm_size, -1,
571         Datatype::encode(args.datatype1), ""));
572
573   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
574       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
575   smpi_execute_flops(args.comp_size);
576
577   TRACE_smpi_comm_out(my_proc_id);
578 }
579
580 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
581 {
582   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
583         Datatype::encode(args.datatype1), ""));
584
585   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
586       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
587   smpi_execute_flops(args.comp_size);
588
589   TRACE_smpi_comm_out(my_proc_id);
590 }
591
592 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
593 {
594   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
595       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
596         Datatype::encode(args.datatype1),
597         Datatype::encode(args.datatype2)));
598
599   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
600       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
601       args.recv_size, args.datatype2, MPI_COMM_WORLD);
602
603   TRACE_smpi_comm_out(my_proc_id);
604 }
605
606 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
607 {
608   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
609         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
610
611   if (name == "gather") {
612     int rank = MPI_COMM_WORLD->rank();
613     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
614         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
615   }
616   else
617     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
618         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
619
620   TRACE_smpi_comm_out(my_proc_id);
621 }
622
623 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
624 {
625   int rank = MPI_COMM_WORLD->rank();
626
627   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
628         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
629         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
630
631   if (name == "gatherv") {
632     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
633         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
634         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
635   }
636   else {
637     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
638         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
639         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
640   }
641
642   TRACE_smpi_comm_out(my_proc_id);
643 }
644
645 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
646 {
647   int rank = MPI_COMM_WORLD->rank();
648   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
649         Datatype::encode(args.datatype1),
650         Datatype::encode(args.datatype2)));
651
652   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
654
655   TRACE_smpi_comm_out(my_proc_id);
656 }
657
658 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
659 {
660   int rank = MPI_COMM_WORLD->rank();
661   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
662         nullptr, Datatype::encode(args.datatype1),
663         Datatype::encode(args.datatype2)));
664
665   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
666       args.sendcounts->data(), args.disps.data(), args.datatype1,
667       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
668       MPI_COMM_WORLD);
669
670   TRACE_smpi_comm_out(my_proc_id);
671 }
672
673 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
674 {
675   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
676       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
677         std::to_string(args.comp_size), /* ugly hack to print comp_size */
678         Datatype::encode(args.datatype1)));
679
680   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
681       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
682       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
683
684   smpi_execute_flops(args.comp_size);
685   TRACE_smpi_comm_out(my_proc_id);
686 }
687
688 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
689 {
690   TRACE_smpi_comm_in(my_proc_id, __func__,
691       new simgrid::instr::VarCollTIData(
692         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
693         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
694
695   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
696       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
697
698   TRACE_smpi_comm_out(my_proc_id);
699 }
700 } // Replay Namespace
701 }} // namespace simgrid::smpi
702
703 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
704 /** @brief Only initialize the replay, don't do it for real */
705 void smpi_replay_init(int* argc, char*** argv)
706 {
707   simgrid::smpi::ActorExt::init(argc, argv);
708   smpi_process()->mark_as_initialized();
709   smpi_process()->set_replaying(true);
710
711   int my_proc_id = simgrid::s4u::this_actor::get_pid();
712
713   TRACE_smpi_init(my_proc_id);
714   TRACE_smpi_computing_init(my_proc_id);
715   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
716   TRACE_smpi_comm_out(my_proc_id);
717   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
718   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
719   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
720   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
721   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
722   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
723   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
724   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
725   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
726   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
727   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
728   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
730   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
731   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
732   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
733   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
734   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
735   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
736   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
737   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
738   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
739   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
740   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
741   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
742   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
743
744   //if we have a delayed start, sleep here.
745   if(*argc>2){
746     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
747     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
748     smpi_execute_flops(value);
749   } else {
750     // Wait for the other actors to initialize also
751     simgrid::s4u::this_actor::yield();
752   }
753 }
754
755 /** @brief actually run the replay after initialization */
756 void smpi_replay_main(int* argc, char*** argv)
757 {
758   static int active_processes = 0;
759   active_processes++;
760   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
761   simgrid::xbt::replay_runner(*argc, *argv);
762
763   /* and now, finalize everything */
764   /* One active process will stop. Decrease the counter*/
765   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
766   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
767   if (count_requests > 0) {
768     MPI_Request requests[count_requests];
769     MPI_Status status[count_requests];
770     unsigned int i=0;
771
772     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
773       requests[i] = pair.second;
774       i++;
775     }
776     simgrid::smpi::Request::waitall(count_requests, requests, status);
777   }
778   active_processes--;
779
780   if(active_processes==0){
781     /* Last process alive speaking: end the simulated timer */
782     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
783     smpi_free_replay_tmp_buffers();
784   }
785
786   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
787                      new simgrid::instr::NoOpTIData("finalize"));
788
789   smpi_process()->finalize();
790
791   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
792   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
793 }
794
795 /** @brief chain a replay initialization and a replay start */
796 void smpi_replay_run(int* argc, char*** argv)
797 {
798   smpi_replay_init(argc, argv);
799   smpi_replay_main(argc, argv);
800 }