Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2d03b912720bc1e137280832a933c679eb043482
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <sstream>
20 #include <vector>
21
22 using simgrid::s4u::Actor;
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2)) {                                                     \
33       std::stringstream ss; \
34       for (const auto& elem : action) { \
35         ss << elem << " "; \
36       } \
37       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
38                            "%zu items were given on the line. First two should be process_id and action.  "            \
39                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
40                            "The full line that was given is:\n   %s\n" \
41                            "Please contact the Simgrid team if support is needed",                                     \
42              __func__, action.size(), static_cast<unsigned long>(mandatory), static_cast<unsigned long>(optional), ss.str().c_str());    \
43     }\
44   }
45
46 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
47 {
48   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
49     std::string s = boost::algorithm::join(action, " ");
50     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
51   }
52 }
53
54 static std::vector<MPI_Request>* get_reqq_self()
55 {
56   return reqq.at(simgrid::s4u::this_actor::getPid());
57 }
58
59 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
60 {
61   reqq.insert({simgrid::s4u::this_actor::getPid(), mpi_request});
62 }
63
64 /* Helper function */
65 static double parse_double(std::string string)
66 {
67   return xbt_str_parse_double(string.c_str(), "%s is not a double");
68 }
69
70 namespace simgrid {
71 namespace smpi {
72
73 namespace replay {
74 class ActionArgParser {
75 public:
76   virtual ~ActionArgParser() = default;
77   virtual void parse(simgrid::xbt::ReplayAction& action, std::string name) { CHECK_ACTION_PARAMS(action, 0, 0) }
78 };
79
80 class SendRecvParser : public ActionArgParser {
81 public:
82   /* communication partner; if we send, this is the receiver and vice versa */
83   int partner;
84   double size;
85   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
86
87   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
88   {
89     CHECK_ACTION_PARAMS(action, 2, 1)
90     partner = std::stoi(action[2]);
91     size    = parse_double(action[3]);
92     if (action.size() > 4)
93       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
94   }
95 };
96
97 class ComputeParser : public ActionArgParser {
98 public:
99   /* communication partner; if we send, this is the receiver and vice versa */
100   double flops;
101
102   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
103   {
104     CHECK_ACTION_PARAMS(action, 1, 0)
105     flops = parse_double(action[2]);
106   }
107 };
108
109 class CollCommParser : public ActionArgParser {
110 public:
111   double size;
112   double comm_size;
113   double comp_size;
114   int send_size;
115   int recv_size;
116   int root = 0;
117   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
118   MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
119 };
120
121 class BcastArgParser : public CollCommParser {
122 public:
123   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
124   {
125     CHECK_ACTION_PARAMS(action, 1, 2)
126     size = parse_double(action[2]);
127     root = (action.size() > 3) ? std::stoi(action[3]) : 0;
128     if (action.size() > 4)
129       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
130   }
131 };
132
133 class ReduceArgParser : public CollCommParser {
134 public:
135   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
136   {
137     CHECK_ACTION_PARAMS(action, 2, 2)
138     comm_size = parse_double(action[2]);
139     comp_size = parse_double(action[3]);
140     root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
141     if (action.size() > 5)
142       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
143   }
144 };
145
146 class AllReduceArgParser : public CollCommParser {
147 public:
148   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
149   {
150     CHECK_ACTION_PARAMS(action, 2, 1)
151     comm_size = parse_double(action[2]);
152     comp_size = parse_double(action[3]);
153     if (action.size() > 4)
154       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
155   }
156 };
157
158 class AllToAllArgParser : public CollCommParser {
159 public:
160   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
161   {
162     CHECK_ACTION_PARAMS(action, 2, 1)
163     comm_size = MPI_COMM_WORLD->size();
164     send_size = parse_double(action[2]);
165     recv_size = parse_double(action[3]);
166
167     if (action.size() > 4)
168       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
169     if (action.size() > 5)
170       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
171   }
172 };
173
174 class GatherArgParser : public CollCommParser {
175 public:
176   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
177   {
178     /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
179           0 gather 68 68 0 0 0
180         where:
181           1) 68 is the sendcounts
182           2) 68 is the recvcounts
183           3) 0 is the root node
184           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
185           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
186     */
187     CHECK_ACTION_PARAMS(action, 2, 3)
188     comm_size = MPI_COMM_WORLD->size();
189     send_size = parse_double(action[2]);
190     recv_size = parse_double(action[3]);
191
192     if (name == "gather") {
193       root      = (action.size() > 4) ? std::stoi(action[4]) : 0;
194       if (action.size() > 5)
195         datatype1 = simgrid::smpi::Datatype::decode(action[5]);
196       if (action.size() > 6)
197         datatype2 = simgrid::smpi::Datatype::decode(action[6]);
198     }
199     else {
200       if (action.size() > 4)
201         datatype1 = simgrid::smpi::Datatype::decode(action[4]);
202       if (action.size() > 5)
203         datatype2 = simgrid::smpi::Datatype::decode(action[5]);
204     }
205   }
206 };
207
208 class GatherVArgParser : public CollCommParser {
209 public:
210   int recv_size_sum;
211   std::shared_ptr<std::vector<int>> recvcounts;
212   std::vector<int> disps;
213   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
214   {
215     /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
216          0 gather 68 68 10 10 10 0 0 0
217        where:
218          1) 68 is the sendcount
219          2) 68 10 10 10 is the recvcounts
220          3) 0 is the root node
221          4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
222          5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
223     */
224     comm_size = MPI_COMM_WORLD->size();
225     CHECK_ACTION_PARAMS(action, comm_size+1, 2)
226     send_size = parse_double(action[2]);
227     disps     = std::vector<int>(comm_size, 0);
228     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
229
230     if (name == "gatherV") {
231       root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
232       if (action.size() > 4 + comm_size)
233         datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
234       if (action.size() > 5 + comm_size)
235         datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
236     }
237     else {
238       int datatype_index = 0;
239       int disp_index     = 0;
240       /* The 3 comes from "0 gather <sendcount>", which must always be present.
241        * The + comm_size is the recvcounts array, which must also be present
242        */
243       if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
244         datatype_index = 3 + comm_size;
245         disp_index     = datatype_index + 1;
246         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
247         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
248       } else if (action.size() > 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
249         disp_index     = 3 + comm_size;
250       } else if (action.size() > 3 + comm_size)  { /* only datatype, no disp specified */
251         datatype_index = 3 + comm_size;
252         datatype1      = simgrid::smpi::Datatype::decode(action[datatype_index]);
253         datatype2      = simgrid::smpi::Datatype::decode(action[datatype_index]);
254       }
255
256       if (disp_index != 0) {
257         for (unsigned int i = 0; i < comm_size; i++)
258           disps[i]          = std::stoi(action[disp_index + i]);
259       }
260     }
261
262     for (unsigned int i = 0; i < comm_size; i++) {
263       (*recvcounts)[i] = std::stoi(action[i + 3]);
264     }
265     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
266   }
267 };
268
269 class ScatterArgParser : public CollCommParser {
270 public:
271   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
272   {
273     /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
274           0 gather 68 68 0 0 0
275         where:
276           1) 68 is the sendcounts
277           2) 68 is the recvcounts
278           3) 0 is the root node
279           4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
280           5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
281     */
282     CHECK_ACTION_PARAMS(action, 2, 3)
283     comm_size   = MPI_COMM_WORLD->size();
284     send_size   = parse_double(action[2]);
285     recv_size   = parse_double(action[3]);
286     root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
287     if (action.size() > 5)
288       datatype1 = simgrid::smpi::Datatype::decode(action[5]);
289     if (action.size() > 6)
290       datatype2 = simgrid::smpi::Datatype::decode(action[6]);
291   }
292 };
293
294 class ScatterVArgParser : public CollCommParser {
295 public:
296   int recv_size_sum;
297   int send_size_sum;
298   std::shared_ptr<std::vector<int>> sendcounts;
299   std::vector<int> disps;
300   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
301   {
302     /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
303        0 gather 68 10 10 10 68 0 0 0
304         where:
305         1) 68 10 10 10 is the sendcounts
306         2) 68 is the recvcount
307         3) 0 is the root node
308         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
309         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
310     */
311     CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
312     recv_size  = parse_double(action[2 + comm_size]);
313     disps      = std::vector<int>(comm_size, 0);
314     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
315
316     if (action.size() > 5 + comm_size)
317       datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
318     if (action.size() > 5 + comm_size)
319       datatype2 = simgrid::smpi::Datatype::decode(action[5]);
320
321     for (unsigned int i = 0; i < comm_size; i++) {
322       (*sendcounts)[i] = std::stoi(action[i + 2]);
323     }
324     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
325     root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
326   }
327 };
328
329 class ReduceScatterArgParser : public CollCommParser {
330 public:
331   int recv_size_sum;
332   std::shared_ptr<std::vector<int>> recvcounts;
333   std::vector<int> disps;
334   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
335   {
336     /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
337          0 reduceScatter 275427 275427 275427 204020 11346849 0
338        where:
339          1) The first four values after the name of the action declare the recvcounts array
340          2) The value 11346849 is the amount of instructions
341          3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
342     */
343     comm_size = MPI_COMM_WORLD->size();
344     CHECK_ACTION_PARAMS(action, comm_size+1, 1)
345     comp_size = parse_double(action[2+comm_size]);
346     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
347     if (action.size() > 3 + comm_size)
348       datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
349
350     for (unsigned int i = 0; i < comm_size; i++) {
351       recvcounts->push_back(std::stoi(action[i + 2]));
352     }
353     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
354   }
355 };
356
357 class AllToAllVArgParser : public CollCommParser {
358 public:
359   int recv_size_sum;
360   int send_size_sum;
361   std::shared_ptr<std::vector<int>> recvcounts;
362   std::shared_ptr<std::vector<int>> sendcounts;
363   std::vector<int> senddisps;
364   std::vector<int> recvdisps;
365   int send_buf_size;
366   int recv_buf_size;
367   void parse(simgrid::xbt::ReplayAction& action, std::string name) override
368   {
369     /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
370           0 allToAllV 100 1 7 10 12 100 1 70 10 5
371        where:
372         1) 100 is the size of the send buffer *sizeof(int),
373         2) 1 7 10 12 is the sendcounts array
374         3) 100*sizeof(int) is the size of the receiver buffer
375         4)  1 70 10 5 is the recvcounts array
376     */
377     comm_size = MPI_COMM_WORLD->size();
378     CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
379     sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380     recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
381     senddisps  = std::vector<int>(comm_size, 0);
382     recvdisps  = std::vector<int>(comm_size, 0);
383
384     if (action.size() > 5 + 2 * comm_size)
385       datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
386     if (action.size() > 5 + 2 * comm_size)
387       datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388
389     send_buf_size=parse_double(action[2]);
390     recv_buf_size=parse_double(action[3+comm_size]);
391     for (unsigned int i = 0; i < comm_size; i++) {
392       (*sendcounts)[i] = std::stoi(action[3 + i]);
393       (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394     }
395     send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
396     recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
397   }
398 };
399
400 template <class T> class ReplayAction {
401 protected:
402   const std::string name;
403   const int my_proc_id;
404   T args;
405
406 public:
407   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::this_actor::getPid()) {}
408   virtual ~ReplayAction() = default;
409
410   virtual void execute(simgrid::xbt::ReplayAction& action)
411   {
412     // Needs to be re-initialized for every action, hence here
413     double start_time = smpi_process()->simulated_elapsed();
414     args.parse(action, name);
415     kernel(action);
416     if (name != "Init")
417       log_timed_action(action, start_time);
418   }
419
420   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
421
422   void* send_buffer(int size)
423   {
424     return smpi_get_tmp_sendbuffer(size);
425   }
426
427   void* recv_buffer(int size)
428   {
429     return smpi_get_tmp_recvbuffer(size);
430   }
431 };
432
433 class WaitAction : public ReplayAction<ActionArgParser> {
434 public:
435   WaitAction() : ReplayAction("Wait") {}
436   void kernel(simgrid::xbt::ReplayAction& action) override
437   {
438     std::string s = boost::algorithm::join(action, " ");
439     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
440     MPI_Request request = get_reqq_self()->back();
441     get_reqq_self()->pop_back();
442
443     if (request == nullptr) {
444       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
445        * return.*/
446       return;
447     }
448
449     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
450
451     // Must be taken before Request::wait() since the request may be set to
452     // MPI_REQUEST_NULL by Request::wait!
453     int src                  = request->comm()->group()->rank(request->src());
454     int dst                  = request->comm()->group()->rank(request->dst());
455     bool is_wait_for_receive = (request->flags() & RECV);
456     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
457     TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
458
459     MPI_Status status;
460     Request::wait(&request, &status);
461
462     TRACE_smpi_comm_out(rank);
463     if (is_wait_for_receive)
464       TRACE_smpi_recv(src, dst, 0);
465   }
466 };
467
468 class SendAction : public ReplayAction<SendRecvParser> {
469 public:
470   SendAction() = delete;
471   explicit SendAction(std::string name) : ReplayAction(name) {}
472   void kernel(simgrid::xbt::ReplayAction& action) override
473   {
474     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
475
476     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
477                                                                              Datatype::encode(args.datatype1)));
478     if (not TRACE_smpi_view_internals())
479       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
480
481     if (name == "send") {
482       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
483     } else if (name == "Isend") {
484       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
485       get_reqq_self()->push_back(request);
486     } else {
487       xbt_die("Don't know this action, %s", name.c_str());
488     }
489
490     TRACE_smpi_comm_out(my_proc_id);
491   }
492 };
493
494 class RecvAction : public ReplayAction<SendRecvParser> {
495 public:
496   RecvAction() = delete;
497   explicit RecvAction(std::string name) : ReplayAction(name) {}
498   void kernel(simgrid::xbt::ReplayAction& action) override
499   {
500     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
501
502     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
503                                                                              Datatype::encode(args.datatype1)));
504
505     MPI_Status status;
506     // unknown size from the receiver point of view
507     if (args.size <= 0.0) {
508       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
509       args.size = status.count;
510     }
511
512     if (name == "recv") {
513       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
514     } else if (name == "Irecv") {
515       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
516       get_reqq_self()->push_back(request);
517     }
518
519     TRACE_smpi_comm_out(my_proc_id);
520     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
521     if (name == "recv" && not TRACE_smpi_view_internals()) {
522       TRACE_smpi_recv(src_traced, my_proc_id, 0);
523     }
524   }
525 };
526
527 class ComputeAction : public ReplayAction<ComputeParser> {
528 public:
529   ComputeAction() : ReplayAction("compute") {}
530   void kernel(simgrid::xbt::ReplayAction& action) override
531   {
532     TRACE_smpi_computing_in(my_proc_id, args.flops);
533     smpi_execute_flops(args.flops);
534     TRACE_smpi_computing_out(my_proc_id);
535   }
536 };
537
538 class TestAction : public ReplayAction<ActionArgParser> {
539 public:
540   TestAction() : ReplayAction("Test") {}
541   void kernel(simgrid::xbt::ReplayAction& action) override
542   {
543     MPI_Request request = get_reqq_self()->back();
544     get_reqq_self()->pop_back();
545     // if request is null here, this may mean that a previous test has succeeded
546     // Different times in traced application and replayed version may lead to this
547     // In this case, ignore the extra calls.
548     if (request != nullptr) {
549       TRACE_smpi_testing_in(my_proc_id);
550
551       MPI_Status status;
552       int flag = Request::test(&request, &status);
553
554       XBT_DEBUG("MPI_Test result: %d", flag);
555       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
556        * nullptr.*/
557       get_reqq_self()->push_back(request);
558
559       TRACE_smpi_testing_out(my_proc_id);
560     }
561   }
562 };
563
564 class InitAction : public ReplayAction<ActionArgParser> {
565 public:
566   InitAction() : ReplayAction("Init") {}
567   void kernel(simgrid::xbt::ReplayAction& action) override
568   {
569     CHECK_ACTION_PARAMS(action, 0, 1)
570     MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
571                                            : MPI_BYTE;  // default TAU datatype
572
573     /* start a simulated timer */
574     smpi_process()->simulated_start();
575     set_reqq_self(new std::vector<MPI_Request>);
576   }
577 };
578
579 class CommunicatorAction : public ReplayAction<ActionArgParser> {
580 public:
581   CommunicatorAction() : ReplayAction("Comm") {}
582   void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
583 };
584
585 class WaitAllAction : public ReplayAction<ActionArgParser> {
586 public:
587   WaitAllAction() : ReplayAction("waitAll") {}
588   void kernel(simgrid::xbt::ReplayAction& action) override
589   {
590     const unsigned int count_requests = get_reqq_self()->size();
591
592     if (count_requests > 0) {
593       TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
594       std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
595       for (const auto& req : (*get_reqq_self())) {
596         if (req && (req->flags() & RECV)) {
597           sender_receiver.push_back({req->src(), req->dst()});
598         }
599       }
600       MPI_Status status[count_requests];
601       Request::waitall(count_requests, &(*get_reqq_self())[0], status);
602
603       for (auto& pair : sender_receiver) {
604         TRACE_smpi_recv(pair.first, pair.second, 0);
605       }
606       TRACE_smpi_comm_out(my_proc_id);
607     }
608   }
609 };
610
611 class BarrierAction : public ReplayAction<ActionArgParser> {
612 public:
613   BarrierAction() : ReplayAction("barrier") {}
614   void kernel(simgrid::xbt::ReplayAction& action) override
615   {
616     TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
617     Colls::barrier(MPI_COMM_WORLD);
618     TRACE_smpi_comm_out(my_proc_id);
619   }
620 };
621
622 class BcastAction : public ReplayAction<BcastArgParser> {
623 public:
624   BcastAction() : ReplayAction("bcast") {}
625   void kernel(simgrid::xbt::ReplayAction& action) override
626   {
627     TRACE_smpi_comm_in(my_proc_id, "action_bcast",
628                        new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
629                                                       -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
630
631     Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
632
633     TRACE_smpi_comm_out(my_proc_id);
634   }
635 };
636
637 class ReduceAction : public ReplayAction<ReduceArgParser> {
638 public:
639   ReduceAction() : ReplayAction("reduce") {}
640   void kernel(simgrid::xbt::ReplayAction& action) override
641   {
642     TRACE_smpi_comm_in(my_proc_id, "action_reduce",
643                        new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
644                                                       args.comp_size, args.comm_size, -1,
645                                                       Datatype::encode(args.datatype1), ""));
646
647     Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
648         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
649     smpi_execute_flops(args.comp_size);
650
651     TRACE_smpi_comm_out(my_proc_id);
652   }
653 };
654
655 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
656 public:
657   AllReduceAction() : ReplayAction("allReduce") {}
658   void kernel(simgrid::xbt::ReplayAction& action) override
659   {
660     TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
661                                                                                 Datatype::encode(args.datatype1), ""));
662
663     Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
664         recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
665     smpi_execute_flops(args.comp_size);
666
667     TRACE_smpi_comm_out(my_proc_id);
668   }
669 };
670
671 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
672 public:
673   AllToAllAction() : ReplayAction("allToAll") {}
674   void kernel(simgrid::xbt::ReplayAction& action) override
675   {
676     TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
677                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
678                                                     Datatype::encode(args.datatype1),
679                                                     Datatype::encode(args.datatype2)));
680
681     Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
682                     args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
683                     args.recv_size, args.datatype2, MPI_COMM_WORLD);
684
685     TRACE_smpi_comm_out(my_proc_id);
686   }
687 };
688
689 class GatherAction : public ReplayAction<GatherArgParser> {
690 public:
691   explicit GatherAction(std::string name) : ReplayAction(name) {}
692   void kernel(simgrid::xbt::ReplayAction& action) override
693   {
694     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
695                                                                           Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
696
697     if (name == "gather") {
698       int rank = MPI_COMM_WORLD->rank();
699       Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
700                  (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
701     }
702     else
703       Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
704                        recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
705
706     TRACE_smpi_comm_out(my_proc_id);
707   }
708 };
709
710 class GatherVAction : public ReplayAction<GatherVArgParser> {
711 public:
712   explicit GatherVAction(std::string name) : ReplayAction(name) {}
713   void kernel(simgrid::xbt::ReplayAction& action) override
714   {
715     int rank = MPI_COMM_WORLD->rank();
716
717     TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
718                                                name, (name == "gatherV") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
719                                                Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
720
721     if (name == "gatherV") {
722       Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
723                      (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
724                      args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
725     }
726     else {
727       Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
728                         recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
729                         args.disps.data(), args.datatype2, MPI_COMM_WORLD);
730     }
731
732     TRACE_smpi_comm_out(my_proc_id);
733   }
734 };
735
736 class ScatterAction : public ReplayAction<ScatterArgParser> {
737 public:
738   ScatterAction() : ReplayAction("scatter") {}
739   void kernel(simgrid::xbt::ReplayAction& action) override
740   {
741     int rank = MPI_COMM_WORLD->rank();
742     TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
743                                                                           Datatype::encode(args.datatype1),
744                                                                           Datatype::encode(args.datatype2)));
745
746     Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
747                   (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
748
749     TRACE_smpi_comm_out(my_proc_id);
750   }
751 };
752
753
754 class ScatterVAction : public ReplayAction<ScatterVArgParser> {
755 public:
756   ScatterVAction() : ReplayAction("scatterV") {}
757   void kernel(simgrid::xbt::ReplayAction& action) override
758   {
759     int rank = MPI_COMM_WORLD->rank();
760     TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
761           nullptr, Datatype::encode(args.datatype1),
762           Datatype::encode(args.datatype2)));
763
764     Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
765                     args.sendcounts->data(), args.disps.data(), args.datatype1,
766                     recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
767                     MPI_COMM_WORLD);
768
769     TRACE_smpi_comm_out(my_proc_id);
770   }
771 };
772
773 class ReduceScatterAction : public ReplayAction<ReduceScatterArgParser> {
774 public:
775   ReduceScatterAction() : ReplayAction("reduceScatter") {}
776   void kernel(simgrid::xbt::ReplayAction& action) override
777   {
778     TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
779                        new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, args.recvcounts,
780                                                          std::to_string(args.comp_size), /* ugly hack to print comp_size */
781                                                          Datatype::encode(args.datatype1)));
782
783     Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
784                           recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
785                           args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
786
787     smpi_execute_flops(args.comp_size);
788     TRACE_smpi_comm_out(my_proc_id);
789   }
790 };
791
792 class AllToAllVAction : public ReplayAction<AllToAllVArgParser> {
793 public:
794   AllToAllVAction() : ReplayAction("allToAllV") {}
795   void kernel(simgrid::xbt::ReplayAction& action) override
796   {
797     TRACE_smpi_comm_in(my_proc_id, __func__,
798                        new simgrid::instr::VarCollTIData(
799                            "allToAllV", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
800                            Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
801
802     Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
803                      recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
804
805     TRACE_smpi_comm_out(my_proc_id);
806   }
807 };
808 } // Replay Namespace
809 }} // namespace simgrid::smpi
810
811 /** @brief Only initialize the replay, don't do it for real */
812 void smpi_replay_init(int* argc, char*** argv)
813 {
814   simgrid::smpi::Process::init(argc, argv);
815   smpi_process()->mark_as_initialized();
816   smpi_process()->set_replaying(true);
817
818   int my_proc_id = simgrid::s4u::this_actor::getPid();
819   TRACE_smpi_init(my_proc_id);
820   TRACE_smpi_computing_init(my_proc_id);
821   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
822   TRACE_smpi_comm_out(my_proc_id);
823   xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
824   xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
825   xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
826   xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
827   xbt_replay_action_register("comm_dup",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
828
829   xbt_replay_action_register("send",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send").execute(action); });
830   xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("Isend").execute(action); });
831   xbt_replay_action_register("recv",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv").execute(action); });
832   xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("Irecv").execute(action); });
833   xbt_replay_action_register("test",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction().execute(action); });
834   xbt_replay_action_register("wait",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction().execute(action); });
835   xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction().execute(action); });
836   xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
837   xbt_replay_action_register("bcast",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
838   xbt_replay_action_register("reduce",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
839   xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
840   xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
841   xbt_replay_action_register("allToAllV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
842   xbt_replay_action_register("gather",   [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
843   xbt_replay_action_register("scatter",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
844   xbt_replay_action_register("gatherV",  [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherV").execute(action); });
845   xbt_replay_action_register("scatterV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
846   xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allGather").execute(action); });
847   xbt_replay_action_register("allGatherV", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allGatherV").execute(action); });
848   xbt_replay_action_register("reduceScatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
849   xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
850
851   //if we have a delayed start, sleep here.
852   if(*argc>2){
853     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
854     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
855     smpi_execute_flops(value);
856   } else {
857     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
858     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
859     smpi_execute_flops(0.0);
860   }
861 }
862
863 /** @brief actually run the replay after initialization */
864 void smpi_replay_main(int* argc, char*** argv)
865 {
866   static int active_processes = 0;
867   active_processes++;
868   simgrid::xbt::replay_runner(*argc, *argv);
869
870   /* and now, finalize everything */
871   /* One active process will stop. Decrease the counter*/
872   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
873   if (not get_reqq_self()->empty()) {
874     unsigned int count_requests=get_reqq_self()->size();
875     MPI_Request requests[count_requests];
876     MPI_Status status[count_requests];
877     unsigned int i=0;
878
879     for (auto const& req : *get_reqq_self()) {
880       requests[i] = req;
881       i++;
882     }
883     simgrid::smpi::Request::waitall(count_requests, requests, status);
884   }
885   delete get_reqq_self();
886   active_processes--;
887
888   if(active_processes==0){
889     /* Last process alive speaking: end the simulated timer */
890     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
891     smpi_free_replay_tmp_buffers();
892   }
893
894   TRACE_smpi_comm_in(simgrid::s4u::this_actor::getPid(), "smpi_replay_run_finalize",
895                      new simgrid::instr::NoOpTIData("finalize"));
896
897   smpi_process()->finalize();
898
899   TRACE_smpi_comm_out(simgrid::s4u::this_actor::getPid());
900   TRACE_smpi_finalize(simgrid::s4u::this_actor::getPid());
901 }
902
903 /** @brief chain a replay initialization and a replay start */
904 void smpi_replay_run(int* argc, char*** argv)
905 {
906   smpi_replay_init(argc, argv);
907   smpi_replay_main(argc, argv);
908 }