Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add call_location to ti-trace and replay as well.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 #include <tuple>
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
23
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
29 public:
30   size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
31 };
32
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
34 {
35   seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
36 }
37
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
40 public:
41   static void apply(size_t& seed, Tuple const& tuple)
42   {
43     HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44     hash_combine(seed, std::get<Index>(tuple));
45   }
46 };
47
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
49 public:
50   static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
51 };
52
53 template <typename... TT> class hash<std::tuple<TT...>> {
54 public:
55   size_t operator()(std::tuple<TT...> const& tt) const
56   {
57     size_t seed = 0;
58     HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
59     return seed;
60   }
61 };
62 }
63
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
66
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
68 {
69   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70     std::string s = boost::algorithm::join(action, " ");
71     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
72   }
73 }
74
75 /* Helper function */
76 static double parse_double(const std::string& string)
77 {
78   return xbt_str_parse_double(string.c_str(), "%s is not a double");
79 }
80
81 namespace simgrid {
82 namespace smpi {
83
84 namespace replay {
85 MPI_Datatype MPI_DEFAULT_TYPE;
86
87 class RequestStorage {
88 private:
89     req_storage_t store;
90
91 public:
92     RequestStorage() {}
93     int size()
94     {
95       return store.size();
96     }
97
98     req_storage_t& get_store()
99     {
100       return store;
101     }
102
103     void get_requests(std::vector<MPI_Request>& vec)
104     {
105       for (auto& pair : store) {
106         auto& req = pair.second;
107         auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108         if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109           vec.push_back(pair.second);
110           pair.second->print_request("MM");
111         }
112       }
113     }
114
115     MPI_Request find(int src, int dst, int tag)
116     {
117       req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118       return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
119     }
120
121     void remove(MPI_Request req)
122     {
123       if (req == MPI_REQUEST_NULL) return;
124
125       store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
126     }
127
128     void add(MPI_Request req)
129     {
130       if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131         store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
132     }
133
134     /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135     void addNullRequest(int src, int dst, int tag)
136     {
137       store.insert({req_key_t(
138             MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139             MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140             tag), MPI_REQUEST_NULL});
141     }
142 };
143
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
145 {
146   CHECK_ACTION_PARAMS(action, 3, 0)
147   src = std::stoi(action[2]);
148   dst = std::stoi(action[3]);
149   tag = std::stoi(action[4]);
150 }
151
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
153 {
154   CHECK_ACTION_PARAMS(action, 3, 1)
155   partner = std::stoi(action[2]);
156   tag     = std::stoi(action[3]);
157   size    = parse_double(action[4]);
158   if (action.size() > 5)
159     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
160 }
161
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   flops = parse_double(action[2]);
166 }
167
168 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
169 {
170   CHECK_ACTION_PARAMS(action, 2, 0)
171   filename = std::string(action[2]);
172   line = std::stoi(action[3]);
173 }
174
175 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 {
177   CHECK_ACTION_PARAMS(action, 1, 2)
178   size = parse_double(action[2]);
179   root = (action.size() > 3) ? std::stoi(action[3]) : 0;
180   if (action.size() > 4)
181     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
182 }
183
184 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
185 {
186   CHECK_ACTION_PARAMS(action, 2, 2)
187   comm_size = parse_double(action[2]);
188   comp_size = parse_double(action[3]);
189   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
190   if (action.size() > 5)
191     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
192 }
193
194 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
195 {
196   CHECK_ACTION_PARAMS(action, 2, 1)
197   comm_size = parse_double(action[2]);
198   comp_size = parse_double(action[3]);
199   if (action.size() > 4)
200     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
201 }
202
203 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
204 {
205   CHECK_ACTION_PARAMS(action, 2, 1)
206   comm_size = MPI_COMM_WORLD->size();
207   send_size = parse_double(action[2]);
208   recv_size = parse_double(action[3]);
209
210   if (action.size() > 4)
211     datatype1 = simgrid::smpi::Datatype::decode(action[4]);
212   if (action.size() > 5)
213     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
214 }
215
216 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
217 {
218   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
219         0 gather 68 68 0 0 0
220       where:
221         1) 68 is the sendcounts
222         2) 68 is the recvcounts
223         3) 0 is the root node
224         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
225         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
226   */
227   CHECK_ACTION_PARAMS(action, 2, 3)
228   comm_size = MPI_COMM_WORLD->size();
229   send_size = parse_double(action[2]);
230   recv_size = parse_double(action[3]);
231
232   if (name == "gather") {
233     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
234     if (action.size() > 5)
235       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
236     if (action.size() > 6)
237       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
238   } else {
239     if (action.size() > 4)
240       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
241     if (action.size() > 5)
242       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
243   }
244 }
245
246 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
247 {
248   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
249        0 gather 68 68 10 10 10 0 0 0
250      where:
251        1) 68 is the sendcount
252        2) 68 10 10 10 is the recvcounts
253        3) 0 is the root node
254        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
255        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
256   */
257   comm_size = MPI_COMM_WORLD->size();
258   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
259   send_size  = parse_double(action[2]);
260   disps      = std::vector<int>(comm_size, 0);
261   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
262
263   if (name == "gatherv") {
264     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
265     if (action.size() > 4 + comm_size)
266       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
267     if (action.size() > 5 + comm_size)
268       datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
269   } else {
270     int datatype_index = 0;
271     int disp_index     = 0;
272     /* The 3 comes from "0 gather <sendcount>", which must always be present.
273      * The + comm_size is the recvcounts array, which must also be present
274      */
275     if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
276       datatype_index = 3 + comm_size;
277       disp_index     = datatype_index + 1;
278       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
279       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
280     } else if (action.size() >
281                3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
282       disp_index = 3 + comm_size;
283     } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
284       datatype_index = 3 + comm_size;
285       datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
286       datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
287     }
288
289     if (disp_index != 0) {
290       for (unsigned int i = 0; i < comm_size; i++)
291         disps[i]          = std::stoi(action[disp_index + i]);
292     }
293   }
294
295   for (unsigned int i = 0; i < comm_size; i++) {
296     (*recvcounts)[i] = std::stoi(action[i + 3]);
297   }
298   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
299 }
300
301 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
302 {
303   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
304         0 gather 68 68 0 0 0
305       where:
306         1) 68 is the sendcounts
307         2) 68 is the recvcounts
308         3) 0 is the root node
309         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
310         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
311   */
312   CHECK_ACTION_PARAMS(action, 2, 3)
313   comm_size = MPI_COMM_WORLD->size();
314   send_size = parse_double(action[2]);
315   recv_size = parse_double(action[3]);
316   root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
317   if (action.size() > 5)
318     datatype1 = simgrid::smpi::Datatype::decode(action[5]);
319   if (action.size() > 6)
320     datatype2 = simgrid::smpi::Datatype::decode(action[6]);
321 }
322
323 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
324 {
325   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
326      0 gather 68 10 10 10 68 0 0 0
327       where:
328       1) 68 10 10 10 is the sendcounts
329       2) 68 is the recvcount
330       3) 0 is the root node
331       4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
332       5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
333   */
334   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
335   recv_size  = parse_double(action[2 + comm_size]);
336   disps      = std::vector<int>(comm_size, 0);
337   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
338
339   if (action.size() > 5 + comm_size)
340     datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
341   if (action.size() > 5 + comm_size)
342     datatype2 = simgrid::smpi::Datatype::decode(action[5]);
343
344   for (unsigned int i = 0; i < comm_size; i++) {
345     (*sendcounts)[i] = std::stoi(action[i + 2]);
346   }
347   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
348   root          = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
349 }
350
351 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
352 {
353   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
354        0 reducescatter 275427 275427 275427 204020 11346849 0
355      where:
356        1) The first four values after the name of the action declare the recvcounts array
357        2) The value 11346849 is the amount of instructions
358        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
359   */
360   comm_size = MPI_COMM_WORLD->size();
361   CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
362   comp_size  = parse_double(action[2 + comm_size]);
363   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
364   if (action.size() > 3 + comm_size)
365     datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
366
367   for (unsigned int i = 0; i < comm_size; i++) {
368     recvcounts->push_back(std::stoi(action[i + 2]));
369   }
370   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
371 }
372
373 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
374 {
375   /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
376         0 alltoallv 100 1 7 10 12 100 1 70 10 5
377      where:
378       1) 100 is the size of the send buffer *sizeof(int),
379       2) 1 7 10 12 is the sendcounts array
380       3) 100*sizeof(int) is the size of the receiver buffer
381       4)  1 70 10 5 is the recvcounts array
382   */
383   comm_size = MPI_COMM_WORLD->size();
384   CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
385   sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
386   recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
387   senddisps  = std::vector<int>(comm_size, 0);
388   recvdisps  = std::vector<int>(comm_size, 0);
389
390   if (action.size() > 5 + 2 * comm_size)
391     datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
392   if (action.size() > 5 + 2 * comm_size)
393     datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
394
395   send_buf_size = parse_double(action[2]);
396   recv_buf_size = parse_double(action[3 + comm_size]);
397   for (unsigned int i = 0; i < comm_size; i++) {
398     (*sendcounts)[i] = std::stoi(action[3 + i]);
399     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
400   }
401   send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
402   recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
403 }
404
405 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
406 {
407   std::string s = boost::algorithm::join(action, " ");
408   xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
409   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
410   req_storage.remove(request);
411
412   if (request == MPI_REQUEST_NULL) {
413     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
414      * return.*/
415     return;
416   }
417
418   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
419
420   // Must be taken before Request::wait() since the request may be set to
421   // MPI_REQUEST_NULL by Request::wait!
422   bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
423   // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
424   TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
425
426   MPI_Status status;
427   Request::wait(&request, &status);
428
429   TRACE_smpi_comm_out(rank);
430   if (is_wait_for_receive)
431     TRACE_smpi_recv(args.src, args.dst, args.tag);
432 }
433
434 void SendAction::kernel(simgrid::xbt::ReplayAction&)
435 {
436   int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
437
438   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
439         args.tag, Datatype::encode(args.datatype1)));
440   if (not TRACE_smpi_view_internals())
441     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
442
443   if (name == "send") {
444     Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445   } else if (name == "isend") {
446     MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447     req_storage.add(request);
448   } else {
449     xbt_die("Don't know this action, %s", name.c_str());
450   }
451
452   TRACE_smpi_comm_out(my_proc_id);
453 }
454
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
456 {
457   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
458         args.tag, Datatype::encode(args.datatype1)));
459
460   MPI_Status status;
461   // unknown size from the receiver point of view
462   if (args.size <= 0.0) {
463     Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
464     args.size = status.count;
465   }
466
467   bool is_recv = false; // Help analyzers understanding that status is not used unintialized
468   if (name == "recv") {
469     is_recv = true;
470     Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
471   } else if (name == "irecv") {
472     MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
473     req_storage.add(request);
474   } else {
475     THROW_IMPOSSIBLE;
476   }
477
478   TRACE_smpi_comm_out(my_proc_id);
479   if (is_recv && not TRACE_smpi_view_internals()) {
480     int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
481     TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
482   }
483 }
484
485 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
486 {
487   smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
488 }
489
490 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
491 {
492   smpi_trace_set_call_location(args.filename.c_str(), args.line);
493 }
494
495 void TestAction::kernel(simgrid::xbt::ReplayAction&)
496 {
497   MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
498   req_storage.remove(request);
499   // if request is null here, this may mean that a previous test has succeeded
500   // Different times in traced application and replayed version may lead to this
501   // In this case, ignore the extra calls.
502   if (request != MPI_REQUEST_NULL) {
503     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
504
505     MPI_Status status;
506     int flag = 0;
507     Request::test(&request, &status, &flag);
508
509     XBT_DEBUG("MPI_Test result: %d", flag);
510     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
511      * nullptr.*/
512     if (request == MPI_REQUEST_NULL)
513       req_storage.addNullRequest(args.src, args.dst, args.tag);
514     else
515       req_storage.add(request);
516
517     TRACE_smpi_comm_out(my_proc_id);
518   }
519 }
520
521 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
522 {
523   CHECK_ACTION_PARAMS(action, 0, 1)
524     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
525     : MPI_BYTE;  // default TAU datatype
526
527   /* start a simulated timer */
528   smpi_process()->simulated_start();
529 }
530
531 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
532 {
533   /* nothing to do */
534 }
535
536 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
537 {
538   const unsigned int count_requests = req_storage.size();
539
540   if (count_requests > 0) {
541     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
542     std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
543     std::vector<MPI_Request> reqs;
544     req_storage.get_requests(reqs);
545     for (const auto& req : reqs) {
546       if (req && (req->flags() & MPI_REQ_RECV)) {
547         sender_receiver.push_back({req->src(), req->dst()});
548       }
549     }
550     MPI_Status status[count_requests];
551     Request::waitall(count_requests, &(reqs.data())[0], status);
552     req_storage.get_store().clear();
553
554     for (auto& pair : sender_receiver) {
555       TRACE_smpi_recv(pair.first, pair.second, 0);
556     }
557     TRACE_smpi_comm_out(my_proc_id);
558   }
559 }
560
561 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
562 {
563   TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
564   Colls::barrier(MPI_COMM_WORLD);
565   TRACE_smpi_comm_out(my_proc_id);
566 }
567
568 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
569 {
570   TRACE_smpi_comm_in(my_proc_id, "action_bcast",
571       new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
572         -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
573
574   Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
575
576   TRACE_smpi_comm_out(my_proc_id);
577 }
578
579 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
580 {
581   TRACE_smpi_comm_in(my_proc_id, "action_reduce",
582       new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
583         args.comp_size, args.comm_size, -1,
584         Datatype::encode(args.datatype1), ""));
585
586   Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
587       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
588   private_execute_flops(args.comp_size);
589
590   TRACE_smpi_comm_out(my_proc_id);
591 }
592
593 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
594 {
595   TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
596         Datatype::encode(args.datatype1), ""));
597
598   Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
599       recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
600   private_execute_flops(args.comp_size);
601
602   TRACE_smpi_comm_out(my_proc_id);
603 }
604
605 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
606 {
607   TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
608       new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
609         Datatype::encode(args.datatype1),
610         Datatype::encode(args.datatype2)));
611
612   Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
613       args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
614       args.recv_size, args.datatype2, MPI_COMM_WORLD);
615
616   TRACE_smpi_comm_out(my_proc_id);
617 }
618
619 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
620 {
621   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
622         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
623
624   if (name == "gather") {
625     int rank = MPI_COMM_WORLD->rank();
626     Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
627         (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
628   }
629   else
630     Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
631         recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
632
633   TRACE_smpi_comm_out(my_proc_id);
634 }
635
636 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
637 {
638   int rank = MPI_COMM_WORLD->rank();
639
640   TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
641         name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
642         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
643
644   if (name == "gatherv") {
645     Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
646         (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
647         args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
648   }
649   else {
650     Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
651         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
652         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
653   }
654
655   TRACE_smpi_comm_out(my_proc_id);
656 }
657
658 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
659 {
660   int rank = MPI_COMM_WORLD->rank();
661   TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
662         Datatype::encode(args.datatype1),
663         Datatype::encode(args.datatype2)));
664
665   Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
666       (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
667
668   TRACE_smpi_comm_out(my_proc_id);
669 }
670
671 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
672 {
673   int rank = MPI_COMM_WORLD->rank();
674   TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
675         nullptr, Datatype::encode(args.datatype1),
676         Datatype::encode(args.datatype2)));
677
678   Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
679       args.sendcounts->data(), args.disps.data(), args.datatype1,
680       recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
681       MPI_COMM_WORLD);
682
683   TRACE_smpi_comm_out(my_proc_id);
684 }
685
686 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
687 {
688   TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
689       new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
690         std::to_string(args.comp_size), /* ugly hack to print comp_size */
691         Datatype::encode(args.datatype1)));
692
693   Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
694       recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
695       args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
696
697   private_execute_flops(args.comp_size);
698   TRACE_smpi_comm_out(my_proc_id);
699 }
700
701 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
702 {
703   TRACE_smpi_comm_in(my_proc_id, __func__,
704       new simgrid::instr::VarCollTIData(
705         "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
706         Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
707
708   Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
709       recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
710
711   TRACE_smpi_comm_out(my_proc_id);
712 }
713 } // Replay Namespace
714 }} // namespace simgrid::smpi
715
716 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
717 /** @brief Only initialize the replay, don't do it for real */
718 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
719 {
720   xbt_assert(not smpi_process()->initializing());
721
722   simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
723   simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
724   simgrid::smpi::ActorExt::init();
725
726   smpi_process()->mark_as_initialized();
727   smpi_process()->set_replaying(true);
728
729   int my_proc_id = simgrid::s4u::this_actor::get_pid();
730
731   TRACE_smpi_init(my_proc_id);
732   TRACE_smpi_computing_init(my_proc_id);
733   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
734   TRACE_smpi_comm_out(my_proc_id);
735   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
736   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
737   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
738   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
739   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
740   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
741   xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
742   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
743   xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
744   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
745   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
746   xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
747   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
748   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
749   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
750   xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
751   xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
752   xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
753   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
754   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
755   xbt_replay_action_register("gatherv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
756   xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
757   xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
758   xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
759   xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
760   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
761   xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
762
763   //if we have a delayed start, sleep here.
764   if (start_delay_flops > 0) {
765     XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
766     private_execute_flops(start_delay_flops);
767   } else {
768     // Wait for the other actors to initialize also
769     simgrid::s4u::this_actor::yield();
770   }
771 }
772
773 /** @brief actually run the replay after initialization */
774 void smpi_replay_main(int rank, const char* trace_filename)
775 {
776   static int active_processes = 0;
777   active_processes++;
778   storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
779   std::string rank_string                      = std::to_string(rank);
780   simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
781
782   /* and now, finalize everything */
783   /* One active process will stop. Decrease the counter*/
784   unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
785   XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
786   if (count_requests > 0) {
787     MPI_Request requests[count_requests];
788     MPI_Status status[count_requests];
789     unsigned int i=0;
790
791     for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
792       requests[i] = pair.second;
793       i++;
794     }
795     simgrid::smpi::Request::waitall(count_requests, requests, status);
796   }
797   active_processes--;
798
799   if(active_processes==0){
800     /* Last process alive speaking: end the simulated timer */
801     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
802     smpi_free_replay_tmp_buffers();
803   }
804
805   TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
806                      new simgrid::instr::NoOpTIData("finalize"));
807
808   smpi_process()->finalize();
809
810   TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
811 }
812
813 /** @brief chain a replay initialization and a replay start */
814 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
815 {
816   smpi_replay_init(instance_id, rank, start_delay_flops);
817   smpi_replay_main(rank, trace_filename);
818 }