Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f2f7f22995396a8b7090dc7bccd37038dd74e7c9
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(std::string string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(src, dst, tag), MPI_REQUEST_NULL});
138     }
139 };
140
141 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
142 {
143   CHECK_ACTION_PARAMS(action, 3, 0)
144   src = std::stoi(action[2]);
145   dst = std::stoi(action[3]);
146   tag = std::stoi(action[4]);
147 }
148
149 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
150 {
151   CHECK_ACTION_PARAMS(action, 3, 1)
152   partner = std::stoi(action[2]);
153   tag     = std::stoi(action[3]);
154   size    = parse_double(action[4]);
155   if (action.size() > 5)
156     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
157 }
158
159 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
160 {
161   CHECK_ACTION_PARAMS(action, 1, 0)
162   flops = parse_double(action[2]);
163 }
164
165 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
166 {
167   CHECK_ACTION_PARAMS(action, 1, 2)
168   size = parse_double(action[2]);
169   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
170   if (action.size() > 4)
171     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
172 }
173
174 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
175 {
176   CHECK_ACTION_PARAMS(action, 2, 2)
177   comm_size = parse_double(action[2]);
178   comp_size = parse_double(action[3]);
179   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
180   if (action.size() > 5)
181     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
182 }
183
184 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
185 {
186   CHECK_ACTION_PARAMS(action, 2, 1)
187   comm_size = parse_double(action[2]);
188   comp_size = parse_double(action[3]);
189   if (action.size() > 4)
190     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
191 }
192
193 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
194 {
195   CHECK_ACTION_PARAMS(action, 2, 1)
196   comm_size = MPI_COMM_WORLD->size();
197   send_size = parse_double(action[2]);
198   recv_size = parse_double(action[3]);
199
200   if (action.size() > 4)
201     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202   if (action.size() > 5)
203     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
204 }
205
206 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
207 {
208   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
209         0 gather 68 68 0 0 0
210       where:
211         1) 68 is the sendcounts
212         2) 68 is the recvcounts
213         3) 0 is the root node
214         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
215         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
216   */
217   CHECK_ACTION_PARAMS(action, 2, 3)
218   comm_size = MPI_COMM_WORLD->size();
219   send_size = parse_double(action[2]);
220   recv_size = parse_double(action[3]);
221
222   if (name == "gather") {
223     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
224     if (action.size() > 5)
225       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
226     if (action.size() > 6)
227       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
228   } else {
229     if (action.size() > 4)
230       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
231     if (action.size() > 5)
232       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
233   }
234 }
235
236 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
237 {
238   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
239        0 gather 68 68 10 10 10 0 0 0
240      where:
241        1) 68 is the sendcount
242        2) 68 10 10 10 is the recvcounts
243        3) 0 is the root node
244        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
245        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
246   */
247   comm_size = MPI_COMM_WORLD->size();
248   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
249   send_size  = parse_double(action[2]);
250   disps      = std::vector<int>(comm_size, 0);
251   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
252
253   if (name == "gatherV") {
254     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
255     if (action.size() > 4 + comm_size)
256       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
257     if (action.size() > 5 + comm_size)
258       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
259   } else {
260     int datatype_index = 0;
261     int disp_index     = 0;
262     /* The 3 comes from "0 gather <sendcount>", which must always be present.
263      * The + comm_size is the recvcounts array, which must also be present
264      */
265     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
266       datatype_index = 3 + comm_size;
267       disp_index     = datatype_index + 1;
268       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
269       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
270     } else if (action.size() >
271                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
272       disp_index = 3 + comm_size;
273     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
274       datatype_index = 3 + comm_size;
275       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
276       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
277     }
278
279     if (disp_index != 0) {
280       for (unsigned int i = 0; i < comm_size; i++)
281         disps[i]          = std::stoi(action[disp_index + i]);
282     }
283   }
284
285   for (unsigned int i = 0; i < comm_size; i++) {
286     (*recvcounts)[i] = std::stoi(action[i + 3]);
287   }
288   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
289 }
290
291 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
292 {
293   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
294         0 gather 68 68 0 0 0
295       where:
296         1) 68 is the sendcounts
297         2) 68 is the recvcounts
298         3) 0 is the root node
299         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
300         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
301   */
302   CHECK_ACTION_PARAMS(action, 2, 3)
303   comm_size = MPI_COMM_WORLD->size();
304   send_size = parse_double(action[2]);
305   recv_size = parse_double(action[3]);
306   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
307   if (action.size() > 5)
308     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
309   if (action.size() > 6)
310     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
311 }
312
313 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
314 {
315   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
316      0 gather 68 10 10 10 68 0 0 0
317       where:
318       1) 68 10 10 10 is the sendcounts
319       2) 68 is the recvcount
320       3) 0 is the root node
321       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
322       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
323   */
324   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
325   recv_size  = parse_double(action[2 + comm_size]);
326   disps      = std::vector<int>(comm_size, 0);
327   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
328
329   if (action.size() > 5 + comm_size)
330     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
331   if (action.size() > 5 + comm_size)
332     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
333
334   for (unsigned int i = 0; i < comm_size; i++) {
335     (*sendcounts)[i] = std::stoi(action[i + 2]);
336   }
337   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
338   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
339 }
340
341 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
342 {
343   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
344        0 reduceScatter 275427 275427 275427 204020 11346849 0
345      where:
346        1) The first four values after the name of the action declare the recvcounts array
347        2) The value 11346849 is the amount of instructions
348        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
349   */
350   comm_size = MPI_COMM_WORLD->size();
351   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
352   comp_size  = parse_double(action[2 + comm_size]);
353   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
354   if (action.size() > 3 + comm_size)
355     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
356
357   for (unsigned int i = 0; i < comm_size; i++) {
358     recvcounts->push_back(std::stoi(action[i + 2]));
359   }
360   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
361 }
362
363 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, std::string name)
364 {
365   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
366         0 allToAllV 100 1 7 10 12 100 1 70 10 5
367      where:
368       1) 100 is the size of the send buffer *sizeof(int),
369       2) 1 7 10 12 is the sendcounts array
370       3) 100*sizeof(int) is the size of the receiver buffer
371       4)  1 70 10 5 is the recvcounts array
372   */
373   comm_size = MPI_COMM_WORLD->size();
374   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
375   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
376   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
377   senddisps  = std::vector<int>(comm_size, 0);
378   recvdisps  = std::vector<int>(comm_size, 0);
379
380   if (action.size() > 5 + 2 * comm_size)
381     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
382   if (action.size() > 5 + 2 * comm_size)
383     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
384
385   send_buf_size = parse_double(action[2]);
386   recv_buf_size = parse_double(action[3 + comm_size]);
387   for (unsigned int i = 0; i < comm_size; i++) {
388     (*sendcounts)[i] = std::stoi(action[3 + i]);
389     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
390   }
391   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
392   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
393 }
394
395 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
396 {
397   std::string s = boost::algorithm::join(action, " ");
398   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
399   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
400   req_storage.remove(request);
401
402   if (request == MPI_REQUEST_NULL) {
403     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
404      * return.*/
405     return;
406   }
407
408   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
409
410   // Must be taken before Request::wait() since the request may be set to
411   // MPI_REQUEST_NULL by Request::wait!
412   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
413   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
414   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
415
416   MPI_Status status;
417   Request::wait(&request, &status);
418
419   TRACE_smpi_comm_out(rank);
420   if (is_wait_for_receive)
421     TRACE_smpi_recv(args.src, args.dst, args.tag);
422 }
423
424 void SendAction::kernel(simgrid::xbt::ReplayAction& action)
425 {
426   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
427
428   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
429         args.tag, Datatype::encode(args.datatype1)));
430   if (not TRACE_smpi_view_internals())
431     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
432
433   if (name == "send") {
434     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
435   } else if (name == "Isend") {
436     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
437     req_storage.add(request);
438   } else {
439     xbt_die("Don't know this action, %s", name.c_str());
440   }
441
442   TRACE_smpi_comm_out(my_proc_id);
443 }
444
445 void RecvAction::kernel(simgrid::xbt::ReplayAction& action)
446 {
447   int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
448
449   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
450         args.tag, Datatype::encode(args.datatype1)));
451
452   MPI_Status status;
453   // unknown size from the receiver point of view
454   if (args.size <= 0.0) {
455     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
456     args.size = status.count;
457   }
458
459   if (name == "recv") {
460     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
461   } else if (name == "Irecv") {
462     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
463     req_storage.add(request);
464   }
465
466   TRACE_smpi_comm_out(my_proc_id);
467   // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
468   if (name == "recv" && not TRACE_smpi_view_internals()) {
469     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
470   }
471 }
472
473 void ComputeAction::kernel(simgrid::xbt::ReplayAction& action)
474 {
475   TRACE_smpi_computing_in(my_proc_id, args.flops);
476   smpi_execute_flops(args.flops);
477   TRACE_smpi_computing_out(my_proc_id);
478 }
479
480 void TestAction::kernel(simgrid::xbt::ReplayAction& action)
481 {
482   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
483   req_storage.remove(request);
484   // if request is null here, this may mean that a previous test has succeeded
485   // Different times in traced application and replayed version may lead to this
486   // In this case, ignore the extra calls.
487   if (request != MPI_REQUEST_NULL) {
488     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
489
490     MPI_Status status;
491     int flag = Request::test(&request, &status);
492
493     XBT_DEBUG("MPI_Test result: %d", flag);
494     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
495      * nullptr.*/
496     if (request == MPI_REQUEST_NULL)
497       req_storage.addNullRequest(args.src, args.dst, args.tag);
498     else
499       req_storage.add(request);
500
501     TRACE_smpi_comm_out(my_proc_id);
502   }
503 }
504
505 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
506 {
507   CHECK_ACTION_PARAMS(action, 0, 1)
508     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
509     : MPI_BYTE;  // default TAU datatype
510
511   /* start a simulated timer */
512   smpi_process()->simulated_start();
513 }
514
515 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction& action)
516 {
517   /* nothing to do */
518 }
519
520 void WaitAllAction::kernel(simgrid::xbt::ReplayAction& action)
521 {
522   const unsigned int count_requests = req_storage.size();
523
524   if (count_requests > 0) {
525     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
526     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
527     std::vector<MPI_Request> reqs;
528     req_storage.get_requests(reqs);
529     for (const auto& req : reqs) {
530       if (req && (req->flags() & MPI_REQ_RECV)) {
531         sender_receiver.push_back({req->src(), req->dst()});
532       }
533     }
534     MPI_Status status[count_requests];
535     Request::waitall(count_requests, &(reqs.data())[0], status);
536     req_storage.get_store().clear();
537
538     for (auto& pair : sender_receiver) {
539       TRACE_smpi_recv(pair.first, pair.second, 0);
540     }
541     TRACE_smpi_comm_out(my_proc_id);
542   }
543 }
544
545 void BarrierAction::kernel(simgrid::xbt::ReplayAction& action)
546 {
547   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
548   Colls::barrier(MPI_COMM_WORLD);
549   TRACE_smpi_comm_out(my_proc_id);
550 }
551
552 void BcastAction::kernel(simgrid::xbt::ReplayAction& action)
553 {
554   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
555       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
556         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
557
558   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
559
560   TRACE_smpi_comm_out(my_proc_id);
561 }
562
563 void ReduceAction::kernel(simgrid::xbt::ReplayAction& action)
564 {
565   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
566       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
567         args.comp_size, args.comm_size, -1,
568         Datatype::encode(args.datatype1), ""));
569
570   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
571       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
572   smpi_execute_flops(args.comp_size);
573
574   TRACE_smpi_comm_out(my_proc_id);
575 }
576
577 void AllReduceAction::kernel(simgrid::xbt::ReplayAction& action)
578 {
579   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
580         Datatype::encode(args.datatype1), ""));
581
582   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
583       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
584   smpi_execute_flops(args.comp_size);
585
586   TRACE_smpi_comm_out(my_proc_id);
587 }
588
589 void AllToAllAction::kernel(simgrid::xbt::ReplayAction& action)
590 {
591   TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
592       new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
593         Datatype::encode(args.datatype1),
594         Datatype::encode(args.datatype2)));
595
596   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
597       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
598       args.recv_size, args.datatype2, MPI_COMM_WORLD);
599
600   TRACE_smpi_comm_out(my_proc_id);
601 }
602
603 void GatherAction::kernel(simgrid::xbt::ReplayAction& action)
604 {
605   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
606         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
607
608   if (name == "gather") {
609     int rank = MPI_COMM_WORLD->rank();
610     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
611         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
612   }
613   else
614     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
615         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
616
617   TRACE_smpi_comm_out(my_proc_id);
618 }
619
620 void GatherVAction::kernel(simgrid::xbt::ReplayAction& action)
621 {
622   int rank = MPI_COMM_WORLD->rank();
623
624   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
625         name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
626         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
627
628   if (name == "gatherV") {
629     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
630         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
631         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
632   }
633   else {
634     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
635         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
636         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
637   }
638
639   TRACE_smpi_comm_out(my_proc_id);
640 }
641
642 void ScatterAction::kernel(simgrid::xbt::ReplayAction& action)
643 {
644   int rank = MPI_COMM_WORLD->rank();
645   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
646         Datatype::encode(args.datatype1),
647         Datatype::encode(args.datatype2)));
648
649   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
650       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
651
652   TRACE_smpi_comm_out(my_proc_id);
653 }
654
655 void ScatterVAction::kernel(simgrid::xbt::ReplayAction& action)
656 {
657   int rank = MPI_COMM_WORLD->rank();
658   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
659         nullptr, Datatype::encode(args.datatype1),
660         Datatype::encode(args.datatype2)));
661
662   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
663       args.sendcounts->data(), args.disps.data(), args.datatype1,
664       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
665       MPI_COMM_WORLD);
666
667   TRACE_smpi_comm_out(my_proc_id);
668 }
669
670 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction& action)
671 {
672   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
673       new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
674         std::to_string(args.comp_size), /* ugly hack to print comp_size */
675         Datatype::encode(args.datatype1)));
676
677   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
678       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
679       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
680
681   smpi_execute_flops(args.comp_size);
682   TRACE_smpi_comm_out(my_proc_id);
683 }
684
685 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction& action)
686 {
687   TRACE_smpi_comm_in(my_proc_id, __func__,
688       new simgrid::instr::VarCollTIData(
689         "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
690         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
691
692   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
693       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
694
695   TRACE_smpi_comm_out(my_proc_id);
696 }
697 } // Replay Namespace
698 }} // namespace simgrid::smpi
699
700 static std::vector<simgrid::smpi::replay::RequestStorage> storage;
701 /** @brief Only initialize the replay, don't do it for real */
702 void smpi_replay_init(int* argc, char*** argv)
703 {
704   simgrid::smpi::Process::init(argc, argv);
705   smpi_process()->mark_as_initialized();
706   smpi_process()->set_replaying(true);
707
708   int my_proc_id = simgrid::s4u::this_actor::get_pid();
709   storage.resize(smpi_process_count());
710
711   TRACE_smpi_init(my_proc_id);
712   TRACE_smpi_computing_init(my_proc_id);
713   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
714   TRACE_smpi_comm_out(my_proc_id);
715   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
716   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
717   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
718   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
719   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
720   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
721   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
722   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
723   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv", storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
724   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
725   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
726   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()-1]).execute(action); });
727   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
728   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
729   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
730   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
731   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
732   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
733   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
734   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
735   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
736   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
737   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
738   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
739   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
740   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
741
742   //if we have a delayed start, sleep here.
743   if(*argc>2){
744     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
745     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
746     smpi_execute_flops(value);
747   } else {
748     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
749     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
750     smpi_execute_flops(0.0);
751   }
752 }
753
754 /** @brief actually run the replay after initialization */
755 void smpi_replay_main(int* argc, char*** argv)
756 {
757   static int active_processes = 0;
758   active_processes++;
759   simgrid::xbt::replay_runner(*argc, *argv);
760
761   /* and now, finalize everything */
762   /* One active process will stop. Decrease the counter*/
763   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid() - 1].size();
764   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
765   if (count_requests > 0) {
766     MPI_Request requests[count_requests];
767     MPI_Status status[count_requests];
768     unsigned int i=0;
769
770     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid() - 1].get_store()) {
771       requests[i] = pair.second;
772       i++;
773     }
774     simgrid::smpi::Request::waitall(count_requests, requests, status);
775   }
776   active_processes--;
777
778   if(active_processes==0){
779     /* Last process alive speaking: end the simulated timer */
780     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
781     smpi_free_replay_tmp_buffers();
782   }
783
784   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
785                      new simgrid::instr::NoOpTIData("finalize"));
786
787   smpi_process()->finalize();
788
789   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
790   TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
791 }
792
793 /** @brief chain a replay initialization and a replay start */
794 void smpi_replay_run(int* argc, char*** argv)
795 {
796   smpi_replay_init(argc, argv);
797   smpi_replay_main(argc, argv);
798 }