Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of framagit.org:simgrid/simgrid
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(std::string string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
169 {
170   CHECK_ACTION_PARAMS(action, 1, 2)
171   size = parse_double(action[2]);
172   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173   if (action.size() > 4)
174     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
175 }
176
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
178 {
179   CHECK_ACTION_PARAMS(action, 2, 2)
180   comm_size = parse_double(action[2]);
181   comp_size = parse_double(action[3]);
182   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
183   if (action.size() > 5)
184     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
185 }
186
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
188 {
189   CHECK_ACTION_PARAMS(action, 2, 1)
190   comm_size = parse_double(action[2]);
191   comp_size = parse_double(action[3]);
192   if (action.size() > 4)
193     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
194 }
195
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
197 {
198   CHECK_ACTION_PARAMS(action, 2, 1)
199   comm_size = MPI_COMM_WORLD->size();
200   send_size = parse_double(action[2]);
201   recv_size = parse_double(action[3]);
202
203   if (action.size() > 4)
204     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205   if (action.size() > 5)
206     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
207 }
208
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
210 {
211   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
212         0 gather 68 68 0 0 0
213       where:
214         1) 68 is the sendcounts
215         2) 68 is the recvcounts
216         3) 0 is the root node
217         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
219   */
220   CHECK_ACTION_PARAMS(action, 2, 3)
221   comm_size = MPI_COMM_WORLD->size();
222   send_size = parse_double(action[2]);
223   recv_size = parse_double(action[3]);
224
225   if (name == "gather") {
226     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
227     if (action.size() > 5)
228       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229     if (action.size() > 6)
230       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
231   } else {
232     if (action.size() > 4)
233       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234     if (action.size() > 5)
235       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
236   }
237 }
238
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
240 {
241   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242        0 gather 68 68 10 10 10 0 0 0
243      where:
244        1) 68 is the sendcount
245        2) 68 10 10 10 is the recvcounts
246        3) 0 is the root node
247        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
249   */
250   comm_size = MPI_COMM_WORLD->size();
251   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252   send_size  = parse_double(action[2]);
253   disps      = std::vector<int>(comm_size, 0);
254   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
255
256   if (name == "gatherv") {
257     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258     if (action.size() > 4 + comm_size)
259       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260     if (action.size() > 5 + comm_size)
261       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
262   } else {
263     int datatype_index = 0;
264     int disp_index     = 0;
265     /* The 3 comes from "0 gather <sendcount>", which must always be present.
266      * The + comm_size is the recvcounts array, which must also be present
267      */
268     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269       datatype_index = 3 + comm_size;
270       disp_index     = datatype_index + 1;
271       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
272       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
273     } else if (action.size() >
274                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275       disp_index = 3 + comm_size;
276     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277       datatype_index = 3 + comm_size;
278       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
280     }
281
282     if (disp_index != 0) {
283       for (unsigned int i = 0; i < comm_size; i++)
284         disps[i]          = std::stoi(action[disp_index + i]);
285     }
286   }
287
288   for (unsigned int i = 0; i < comm_size; i++) {
289     (*recvcounts)[i] = std::stoi(action[i + 3]);
290   }
291   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
292 }
293
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
295 {
296   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
297         0 gather 68 68 0 0 0
298       where:
299         1) 68 is the sendcounts
300         2) 68 is the recvcounts
301         3) 0 is the root node
302         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
304   */
305   CHECK_ACTION_PARAMS(action, 2, 3)
306   comm_size = MPI_COMM_WORLD->size();
307   send_size = parse_double(action[2]);
308   recv_size = parse_double(action[3]);
309   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
310   if (action.size() > 5)
311     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312   if (action.size() > 6)
313     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
314 }
315
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
317 {
318   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319      0 gather 68 10 10 10 68 0 0 0
320       where:
321       1) 68 10 10 10 is the sendcounts
322       2) 68 is the recvcount
323       3) 0 is the root node
324       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
326   */
327   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328   recv_size  = parse_double(action[2 + comm_size]);
329   disps      = std::vector<int>(comm_size, 0);
330   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
331
332   if (action.size() > 5 + comm_size)
333     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334   if (action.size() > 5 + comm_size)
335     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
336
337   for (unsigned int i = 0; i < comm_size; i++) {
338     (*sendcounts)[i] = std::stoi(action[i + 2]);
339   }
340   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
342 }
343
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
345 {
346   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347        0 reducescatter 275427 275427 275427 204020 11346849 0
348      where:
349        1) The first four values after the name of the action declare the recvcounts array
350        2) The value 11346849 is the amount of instructions
351        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
352   */
353   comm_size = MPI_COMM_WORLD->size();
354   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355   comp_size  = parse_double(action[2 + comm_size]);
356   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357   if (action.size() > 3 + comm_size)
358     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
359
360   for (unsigned int i = 0; i < comm_size; i++) {
361     recvcounts->push_back(std::stoi(action[i + 2]));
362   }
363   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
364 }
365
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
367 {
368   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369         0 alltoallv 100 1 7 10 12 100 1 70 10 5
370      where:
371       1) 100 is the size of the send buffer *sizeof(int),
372       2) 1 7 10 12 is the sendcounts array
373       3) 100*sizeof(int) is the size of the receiver buffer
374       4)  1 70 10 5 is the recvcounts array
375   */
376   comm_size = MPI_COMM_WORLD->size();
377   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380   senddisps  = std::vector<int>(comm_size, 0);
381   recvdisps  = std::vector<int>(comm_size, 0);
382
383   if (action.size() > 5 + 2 * comm_size)
384     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385   if (action.size() > 5 + 2 * comm_size)
386     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
387
388   send_buf_size = parse_double(action[2]);
389   recv_buf_size = parse_double(action[3 + comm_size]);
390   for (unsigned int i = 0; i < comm_size; i++) {
391     (*sendcounts)[i] = std::stoi(action[3 + i]);
392     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
393   }
394   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
396 }
397
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
399 {
400   std::string s = boost::algorithm::join(action, " ");
401   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403   req_storage.remove(request);
404
405   if (request == MPI_REQUEST_NULL) {
406     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
407      * return.*/
408     return;
409   }
410
411   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
412
413   // Must be taken before Request::wait() since the request may be set to
414   // MPI_REQUEST_NULL by Request::wait!
415   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
418
419   MPI_Status status;
420   Request::wait(&request, &status);
421
422   TRACE_smpi_comm_out(rank);
423   if (is_wait_for_receive)
424     TRACE_smpi_recv(args.src, args.dst, args.tag);
425 }
426
427 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
428 {
429   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
430
431   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432         args.tag, Datatype::encode(args.datatype1)));
433   if (not TRACE_smpi_view_internals())
434     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
435
436   if (name == "send") {
437     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438   } else if (name == "isend") {
439     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440     req_storage.add(request);
441   } else {
442     xbt_die("Don't know this action, %s", name.c_str());
443   }
444
445   TRACE_smpi_comm_out(my_proc_id);
446 }
447
448 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
449 {
450   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451         args.tag, Datatype::encode(args.datatype1)));
452
453   MPI_Status status;
454   // unknown size from the receiver point of view
455   if (args.size <= 0.0) {
456     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457     args.size = status.count;
458   }
459
460   if (name == "recv") {
461     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
462   } else if (name == "irecv") {
463     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
464     req_storage.add(request);
465   }
466
467   TRACE_smpi_comm_out(my_proc_id);
468   // TODO: Check why this was only activated in the "recv" case and not in the "irecv" case
469   if (name == "recv" && not TRACE_smpi_view_internals()) {
470     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
471     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
472   }
473 }
474
475 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
476 {
477   TRACE_smpi_computing_in(my_proc_id, args.flops);
478   smpi_execute_flops(args.flops);
479   TRACE_smpi_computing_out(my_proc_id);
480 }
481
482 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
483 {
484   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
485   req_storage.remove(request);
486   // if request is null here, this may mean that a previous test has succeeded
487   // Different times in traced application and replayed version may lead to this
488   // In this case, ignore the extra calls.
489   if (request != MPI_REQUEST_NULL) {
490     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
491
492     MPI_Status status;
493     int flag = Request::test(&request, &status);
494
495     XBT_DEBUG("MPI_Test result: %d", flag);
496     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
497      * nullptr.*/
498     if (request == MPI_REQUEST_NULL)
499       req_storage.addNullRequest(args.src, args.dst, args.tag);
500     else
501       req_storage.add(request);
502
503     TRACE_smpi_comm_out(my_proc_id);
504   }
505 }
506
507 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
508 {
509   CHECK_ACTION_PARAMS(action, 0, 1)
510     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
511     : MPI_BYTE;  // default TAU datatype
512
513   /* start a simulated timer */
514   smpi_process()->simulated_start();
515 }
516
517 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
518 {
519   /* nothing to do */
520 }
521
522 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
523 {
524   const unsigned int count_requests = req_storage.size();
525
526   if (count_requests > 0) {
527     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
528     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
529     std::vector<MPI_Request> reqs;
530     req_storage.get_requests(reqs);
531     for (const auto& req : reqs) {
532       if (req && (req->flags() & MPI_REQ_RECV)) {
533         sender_receiver.push_back({req->src(), req->dst()});
534       }
535     }
536     MPI_Status status[count_requests];
537     Request::waitall(count_requests, &(reqs.data())[0], status);
538     req_storage.get_store().clear();
539
540     for (auto& pair : sender_receiver) {
541       TRACE_smpi_recv(pair.first, pair.second, 0);
542     }
543     TRACE_smpi_comm_out(my_proc_id);
544   }
545 }
546
547 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
548 {
549   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
550   Colls::barrier(MPI_COMM_WORLD);
551   TRACE_smpi_comm_out(my_proc_id);
552 }
553
554 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
555 {
556   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
557       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
558         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
559
560   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
561
562   TRACE_smpi_comm_out(my_proc_id);
563 }
564
565 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
566 {
567   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
568       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
569         args.comp_size, args.comm_size, -1,
570         Datatype::encode(args.datatype1), ""));
571
572   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
573       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
574   smpi_execute_flops(args.comp_size);
575
576   TRACE_smpi_comm_out(my_proc_id);
577 }
578
579 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
580 {
581   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
582         Datatype::encode(args.datatype1), ""));
583
584   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
585       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
586   smpi_execute_flops(args.comp_size);
587
588   TRACE_smpi_comm_out(my_proc_id);
589 }
590
591 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
592 {
593   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
594       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
595         Datatype::encode(args.datatype1),
596         Datatype::encode(args.datatype2)));
597
598   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
599       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
600       args.recv_size, args.datatype2, MPI_COMM_WORLD);
601
602   TRACE_smpi_comm_out(my_proc_id);
603 }
604
605 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
606 {
607   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
608         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
609
610   if (name == "gather") {
611     int rank = MPI_COMM_WORLD->rank();
612     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
613         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
614   }
615   else
616     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
617         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
618
619   TRACE_smpi_comm_out(my_proc_id);
620 }
621
622 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
623 {
624   int rank = MPI_COMM_WORLD->rank();
625
626   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
627         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
628         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
629
630   if (name == "gatherv") {
631     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
632         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
633         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
634   }
635   else {
636     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
637         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
638         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
639   }
640
641   TRACE_smpi_comm_out(my_proc_id);
642 }
643
644 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
645 {
646   int rank = MPI_COMM_WORLD->rank();
647   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
648         Datatype::encode(args.datatype1),
649         Datatype::encode(args.datatype2)));
650
651   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
652       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
653
654   TRACE_smpi_comm_out(my_proc_id);
655 }
656
657 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
658 {
659   int rank = MPI_COMM_WORLD->rank();
660   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
661         nullptr, Datatype::encode(args.datatype1),
662         Datatype::encode(args.datatype2)));
663
664   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
665       args.sendcounts->data(), args.disps.data(), args.datatype1,
666       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
667       MPI_COMM_WORLD);
668
669   TRACE_smpi_comm_out(my_proc_id);
670 }
671
672 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
673 {
674   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
675       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
676         std::to_string(args.comp_size), /* ugly hack to print comp_size */
677         Datatype::encode(args.datatype1)));
678
679   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
680       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
681       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
682
683   smpi_execute_flops(args.comp_size);
684   TRACE_smpi_comm_out(my_proc_id);
685 }
686
687 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
688 {
689   TRACE_smpi_comm_in(my_proc_id, __func__,
690       new simgrid::instr::VarCollTIData(
691         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
692         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
693
694   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
695       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
696
697   TRACE_smpi_comm_out(my_proc_id);
698 }
699 } // Replay Namespace
700 }} // namespace simgrid::smpi
701
702 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
703 /** @brief Only initialize the replay, don't do it for real */
704 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
705 {
706   if (not smpi_process()->initializing()){
707     simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
708     simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
709     simgrid::smpi::ActorExt::init();
710   }
711   smpi_process()->mark_as_initialized();
712   smpi_process()->set_replaying(true);
713
714   int my_proc_id = simgrid::s4u::this_actor::get_pid();
715
716   TRACE_smpi_init(my_proc_id);
717   TRACE_smpi_computing_init(my_proc_id);
718   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
719   TRACE_smpi_comm_out(my_proc_id);
720   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
721   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
722   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
723   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
724   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
725   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
726   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
727   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
728   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
732   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
733   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
734   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
735   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
736   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
737   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
738   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
739   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
740   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
741   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
742   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
743   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
744   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
745   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
746
747   //if we have a delayed start, sleep here.
748   if (start_delay_flops > 0) {
749     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
750     smpi_execute_flops(start_delay_flops);
751   } else {
752     // Wait for the other actors to initialize also
753     simgrid::s4u::this_actor::yield();
754   }
755 }
756
757 /** @brief actually run the replay after initialization */
758 void smpi_replay_main(int rank, const char* trace_filename)
759 {
760   static int active_processes = 0;
761   active_processes++;
762   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
763   std::string rank_string                      = std::to_string(rank);
764   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
765
766   /* and now, finalize everything */
767   /* One active process will stop. Decrease the counter*/
768   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
769   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
770   if (count_requests > 0) {
771     MPI_Request requests[count_requests];
772     MPI_Status status[count_requests];
773     unsigned int i=0;
774
775     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
776       requests[i] = pair.second;
777       i++;
778     }
779     simgrid::smpi::Request::waitall(count_requests, requests, status);
780   }
781   active_processes--;
782
783   if(active_processes==0){
784     /* Last process alive speaking: end the simulated timer */
785     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
786     smpi_free_replay_tmp_buffers();
787   }
788
789   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
790                      new simgrid::instr::NoOpTIData("finalize"));
791
792   smpi_process()->finalize();
793
794   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
795   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
796 }
797
798 /** @brief chain a replay initialization and a replay start */
799 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
800 {
801   smpi_replay_init(instance_id, rank, start_delay_flops);
802   smpi_replay_main(rank, trace_filename);
803 }