Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Macro cosmetics
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 class ReplayActionArg {
43   ReplayActionArg() {}
44 };
45
46 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
47   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
48     std::string s = boost::algorithm::join(action, " ");
49     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
50   }
51 }
52
53 static std::vector<MPI_Request>* get_reqq_self()
54 {
55   return reqq.at(Actor::self()->getPid());
56 }
57
58 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
59 {
60    reqq.insert({Actor::self()->getPid(), mpi_request});
61 }
62
63 /* Helper function */
64 static double parse_double(std::string string)
65 {
66   return xbt_str_parse_double(string.c_str(), "%s is not a double");
67 }
68
69
70 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
71 static MPI_Datatype decode_datatype(std::string action)
72 {
73   return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
74 }
75
76 const char* encode_datatype(MPI_Datatype datatype)
77 {
78   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
79     return "-1";
80
81   return datatype->encode();
82 }
83
84 namespace simgrid {
85 namespace smpi {
86
87 static void action_init(simgrid::xbt::ReplayAction& action)
88 {
89   XBT_DEBUG("Initialize the counters");
90   CHECK_ACTION_PARAMS(action, 0, 1)
91   if (action.size() > 2)
92     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
93   else
94     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
95
96   /* start a simulated timer */
97   smpi_process()->simulated_start();
98   /*initialize the number of active processes */
99   active_processes = smpi_process_count();
100
101   set_reqq_self(new std::vector<MPI_Request>);
102 }
103
104 static void action_finalize(simgrid::xbt::ReplayAction& action)
105 {
106   /* Nothing to do */
107 }
108
109 static void action_comm_size(simgrid::xbt::ReplayAction& action)
110 {
111   communicator_size = parse_double(action[2]);
112   log_timed_action (action, smpi_process()->simulated_elapsed());
113 }
114
115 static void action_comm_split(simgrid::xbt::ReplayAction& action)
116 {
117   log_timed_action (action, smpi_process()->simulated_elapsed());
118 }
119
120 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
121 {
122   log_timed_action (action, smpi_process()->simulated_elapsed());
123 }
124
125 static void action_compute(simgrid::xbt::ReplayAction& action)
126 {
127   CHECK_ACTION_PARAMS(action, 1, 0)
128   double clock = smpi_process()->simulated_elapsed();
129   double flops= parse_double(action[2]);
130   int my_proc_id = Actor::self()->getPid();
131
132   TRACE_smpi_computing_in(my_proc_id, flops);
133   smpi_execute_flops(flops);
134   TRACE_smpi_computing_out(my_proc_id);
135
136   log_timed_action (action, clock);
137 }
138
139 static void action_send(simgrid::xbt::ReplayAction& action)
140 {
141   CHECK_ACTION_PARAMS(action, 2, 1)
142   int to       = std::stoi(action[2]);
143   double size=parse_double(action[3]);
144   double clock = smpi_process()->simulated_elapsed();
145
146   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
147
148   int my_proc_id = Actor::self()->getPid();
149   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
150
151   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
152                      new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
153   if (not TRACE_smpi_view_internals())
154     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
155
156   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
157
158   TRACE_smpi_comm_out(my_proc_id);
159
160   log_timed_action(action, clock);
161 }
162
163 static void action_Isend(simgrid::xbt::ReplayAction& action)
164 {
165   CHECK_ACTION_PARAMS(action, 2, 1)
166   int to       = std::stoi(action[2]);
167   double size=parse_double(action[3]);
168   double clock = smpi_process()->simulated_elapsed();
169
170   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
171
172   int my_proc_id = Actor::self()->getPid();
173   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
174   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
175                      new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
176   if (not TRACE_smpi_view_internals())
177     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
178
179   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
180
181   TRACE_smpi_comm_out(my_proc_id);
182
183   get_reqq_self()->push_back(request);
184
185   log_timed_action (action, clock);
186 }
187
188 static void action_recv(simgrid::xbt::ReplayAction& action)
189 {
190   CHECK_ACTION_PARAMS(action, 2, 1)
191   int from     = std::stoi(action[2]);
192   double size=parse_double(action[3]);
193   double clock = smpi_process()->simulated_elapsed();
194   MPI_Status status;
195
196   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
197
198   int my_proc_id = Actor::self()->getPid();
199   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
200
201   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
202                      new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
203
204   //unknown size from the receiver point of view
205   if (size <= 0.0) {
206     Request::probe(from, 0, MPI_COMM_WORLD, &status);
207     size=status.count;
208   }
209
210   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
211
212   TRACE_smpi_comm_out(my_proc_id);
213   if (not TRACE_smpi_view_internals()) {
214     TRACE_smpi_recv(src_traced, my_proc_id, 0);
215   }
216
217   log_timed_action (action, clock);
218 }
219
220 static void action_Irecv(simgrid::xbt::ReplayAction& action)
221 {
222   CHECK_ACTION_PARAMS(action, 2, 1)
223   int from     = std::stoi(action[2]);
224   double size=parse_double(action[3]);
225   double clock = smpi_process()->simulated_elapsed();
226
227   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
228
229   int my_proc_id = Actor::self()->getPid();
230   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
231                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
232   MPI_Status status;
233   //unknow size from the receiver pov
234   if (size <= 0.0) {
235     Request::probe(from, 0, MPI_COMM_WORLD, &status);
236     size = status.count;
237   }
238
239   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
240
241   TRACE_smpi_comm_out(my_proc_id);
242   get_reqq_self()->push_back(request);
243
244   log_timed_action (action, clock);
245 }
246
247 static void action_test(simgrid::xbt::ReplayAction& action)
248 {
249   CHECK_ACTION_PARAMS(action, 0, 0)
250   double clock = smpi_process()->simulated_elapsed();
251   MPI_Status status;
252
253   MPI_Request request = get_reqq_self()->back();
254   get_reqq_self()->pop_back();
255   //if request is null here, this may mean that a previous test has succeeded
256   //Different times in traced application and replayed version may lead to this
257   //In this case, ignore the extra calls.
258   if(request!=nullptr){
259     int my_proc_id = Actor::self()->getPid();
260     TRACE_smpi_testing_in(my_proc_id);
261
262     int flag = Request::test(&request, &status);
263
264     XBT_DEBUG("MPI_Test result: %d", flag);
265     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
266     get_reqq_self()->push_back(request);
267
268     TRACE_smpi_testing_out(my_proc_id);
269   }
270   log_timed_action (action, clock);
271 }
272
273 static void action_wait(simgrid::xbt::ReplayAction& action)
274 {
275   CHECK_ACTION_PARAMS(action, 0, 0)
276   double clock = smpi_process()->simulated_elapsed();
277   MPI_Status status;
278
279   std::string s = boost::algorithm::join(action, " ");
280   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
281   MPI_Request request = get_reqq_self()->back();
282   get_reqq_self()->pop_back();
283
284   if (request==nullptr){
285     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
286     return;
287   }
288
289   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
290
291   MPI_Group group = request->comm()->group();
292   int src_traced = group->rank(request->src());
293   int dst_traced = group->rank(request->dst());
294   int is_wait_for_receive = (request->flags() & RECV);
295   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
296
297   Request::wait(&request, &status);
298
299   TRACE_smpi_comm_out(rank);
300   if (is_wait_for_receive)
301     TRACE_smpi_recv(src_traced, dst_traced, 0);
302   log_timed_action (action, clock);
303 }
304
305 static void action_waitall(simgrid::xbt::ReplayAction& action)
306 {
307   CHECK_ACTION_PARAMS(action, 0, 0)
308   double clock = smpi_process()->simulated_elapsed();
309   const unsigned int count_requests = get_reqq_self()->size();
310
311   if (count_requests>0) {
312     MPI_Status status[count_requests];
313
314     int my_proc_id_traced = Actor::self()->getPid();
315     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
316                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
317     int recvs_snd[count_requests];
318     int recvs_rcv[count_requests];
319     for (unsigned int i = 0; i < count_requests; i++) {
320       const auto& req = (*get_reqq_self())[i];
321       if (req && (req->flags() & RECV)) {
322         recvs_snd[i] = req->src();
323         recvs_rcv[i] = req->dst();
324       } else
325         recvs_snd[i] = -100;
326    }
327    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
328
329    for (unsigned i = 0; i < count_requests; i++) {
330      if (recvs_snd[i]!=-100)
331        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
332    }
333    TRACE_smpi_comm_out(my_proc_id_traced);
334   }
335   log_timed_action (action, clock);
336 }
337
338 static void action_barrier(simgrid::xbt::ReplayAction& action)
339 {
340   double clock = smpi_process()->simulated_elapsed();
341   int my_proc_id = Actor::self()->getPid();
342   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
343
344   Colls::barrier(MPI_COMM_WORLD);
345
346   TRACE_smpi_comm_out(my_proc_id);
347   log_timed_action (action, clock);
348 }
349
350 static void action_bcast(simgrid::xbt::ReplayAction& action)
351 {
352   CHECK_ACTION_PARAMS(action, 1, 2)
353   double size = parse_double(action[2]);
354   double clock = smpi_process()->simulated_elapsed();
355   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
356   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
358
359   int my_proc_id = Actor::self()->getPid();
360   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362                                                     -1, MPI_CURRENT_TYPE->encode(), ""));
363
364   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
365
366   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
367
368   TRACE_smpi_comm_out(my_proc_id);
369   log_timed_action (action, clock);
370 }
371
372 static void action_reduce(simgrid::xbt::ReplayAction& action)
373 {
374   CHECK_ACTION_PARAMS(action, 2, 2)
375   double comm_size = parse_double(action[2]);
376   double comp_size = parse_double(action[3]);
377   double clock = smpi_process()->simulated_elapsed();
378   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
379
380   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
381
382   int my_proc_id = Actor::self()->getPid();
383   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385                                                     comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
386
387   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390   smpi_execute_flops(comp_size);
391
392   TRACE_smpi_comm_out(my_proc_id);
393   log_timed_action (action, clock);
394 }
395
396 static void action_allReduce(simgrid::xbt::ReplayAction& action)
397 {
398   CHECK_ACTION_PARAMS(action, 2, 1)
399   double comm_size = parse_double(action[2]);
400   double comp_size = parse_double(action[3]);
401
402   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
403
404   double clock = smpi_process()->simulated_elapsed();
405   int my_proc_id = Actor::self()->getPid();
406   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407                                                                               MPI_CURRENT_TYPE->encode(), ""));
408
409   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412   smpi_execute_flops(comp_size);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_allToAll(simgrid::xbt::ReplayAction& action)
419 {
420   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
421   double clock = smpi_process()->simulated_elapsed();
422   unsigned long comm_size = MPI_COMM_WORLD->size();
423   int send_size = parse_double(action[2]);
424   int recv_size = parse_double(action[3]);
425   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
426   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
427
428   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
429   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
430
431   int my_proc_id = Actor::self()->getPid();
432   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
433                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
434                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
435
436   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
437
438   TRACE_smpi_comm_out(my_proc_id);
439   log_timed_action (action, clock);
440 }
441
442 static void action_gather(simgrid::xbt::ReplayAction& action)
443 {
444   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
445         0 gather 68 68 0 0 0
446       where:
447         1) 68 is the sendcounts
448         2) 68 is the recvcounts
449         3) 0 is the root node
450         4) 0 is the send datatype id, see decode_datatype()
451         5) 0 is the recv datatype id, see decode_datatype()
452   */
453   CHECK_ACTION_PARAMS(action, 2, 3)
454   double clock = smpi_process()->simulated_elapsed();
455   unsigned long comm_size = MPI_COMM_WORLD->size();
456   int send_size = parse_double(action[2]);
457   int recv_size = parse_double(action[3]);
458   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
459   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
460
461   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
462   void *recv = nullptr;
463   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
464   int rank = MPI_COMM_WORLD->rank();
465
466   if(rank==root)
467     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
468
469   TRACE_smpi_comm_in(rank, __FUNCTION__,
470                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
471                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
472
473   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
474
475   TRACE_smpi_comm_out(Actor::self()->getPid());
476   log_timed_action (action, clock);
477 }
478
479 static void action_scatter(simgrid::xbt::ReplayAction& action)
480 {
481   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
482         0 gather 68 68 0 0 0
483       where:
484         1) 68 is the sendcounts
485         2) 68 is the recvcounts
486         3) 0 is the root node
487         4) 0 is the send datatype id, see decode_datatype()
488         5) 0 is the recv datatype id, see decode_datatype()
489   */
490   CHECK_ACTION_PARAMS(action, 2, 3)
491   double clock                   = smpi_process()->simulated_elapsed();
492   unsigned long comm_size        = MPI_COMM_WORLD->size();
493   int send_size                  = parse_double(action[2]);
494   int recv_size                  = parse_double(action[3]);
495   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
496   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
497
498   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
499   void* recv = nullptr;
500   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
501   int rank = MPI_COMM_WORLD->rank();
502
503   if (rank == root)
504     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
505
506   TRACE_smpi_comm_in(rank, __FUNCTION__,
507                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
508                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
509
510   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
511
512   TRACE_smpi_comm_out(Actor::self()->getPid());
513   log_timed_action(action, clock);
514 }
515
516 static void action_gatherv(simgrid::xbt::ReplayAction& action)
517 {
518   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
519        0 gather 68 68 10 10 10 0 0 0
520      where:
521        1) 68 is the sendcount
522        2) 68 10 10 10 is the recvcounts
523        3) 0 is the root node
524        4) 0 is the send datatype id, see decode_datatype()
525        5) 0 is the recv datatype id, see decode_datatype()
526   */
527   double clock = smpi_process()->simulated_elapsed();
528   unsigned long comm_size = MPI_COMM_WORLD->size();
529   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
530   int send_size = parse_double(action[2]);
531   std::vector<int> disps(comm_size, 0);
532   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
533
534   MPI_Datatype MPI_CURRENT_TYPE =
535       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
536   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
537                                                                  : MPI_DEFAULT_TYPE};
538
539   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
540   void *recv = nullptr;
541   for (unsigned int i = 0; i < comm_size; i++) {
542     (*recvcounts)[i] = std::stoi(action[i + 3]);
543   }
544   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
545
546   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
547   int rank = MPI_COMM_WORLD->rank();
548
549   if(rank==root)
550     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
551
552   TRACE_smpi_comm_in(rank, __FUNCTION__,
553                      new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
554                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
555
556   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
557                  MPI_COMM_WORLD);
558
559   TRACE_smpi_comm_out(Actor::self()->getPid());
560   log_timed_action (action, clock);
561 }
562
563 static void action_scatterv(simgrid::xbt::ReplayAction& action)
564 {
565   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
566        0 gather 68 10 10 10 68 0 0 0
567      where:
568        1) 68 10 10 10 is the sendcounts
569        2) 68 is the recvcount
570        3) 0 is the root node
571        4) 0 is the send datatype id, see decode_datatype()
572        5) 0 is the recv datatype id, see decode_datatype()
573   */
574   double clock  = smpi_process()->simulated_elapsed();
575   unsigned long comm_size = MPI_COMM_WORLD->size();
576   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
577   int recv_size = parse_double(action[2 + comm_size]);
578   std::vector<int> disps(comm_size, 0);
579   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
580
581   MPI_Datatype MPI_CURRENT_TYPE =
582       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
583   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
584                                                                  : MPI_DEFAULT_TYPE};
585
586   void* send = nullptr;
587   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
588   for (unsigned int i = 0; i < comm_size; i++) {
589     (*sendcounts)[i] = std::stoi(action[i + 2]);
590   }
591   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
592
593   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
594   int rank = MPI_COMM_WORLD->rank();
595
596   if (rank == root)
597     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
598
599   TRACE_smpi_comm_in(rank, __FUNCTION__,
600                      new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
601                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
602
603   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
604                   MPI_COMM_WORLD);
605
606   TRACE_smpi_comm_out(Actor::self()->getPid());
607   log_timed_action(action, clock);
608 }
609
610 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
611 {
612   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
613        0 reduceScatter 275427 275427 275427 204020 11346849 0
614      where:
615        1) The first four values after the name of the action declare the recvcounts array
616        2) The value 11346849 is the amount of instructions
617        3) The last value corresponds to the datatype, see decode_datatype().
618  */
619   double clock = smpi_process()->simulated_elapsed();
620   unsigned long comm_size = MPI_COMM_WORLD->size();
621   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
622   int comp_size = parse_double(action[2+comm_size]);
623   int my_proc_id                     = Actor::self()->getPid();
624   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
625   MPI_Datatype MPI_CURRENT_TYPE =
626       (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
627
628   for (unsigned int i = 0; i < comm_size; i++) {
629     recvcounts->push_back(std::stoi(action[i + 2]));
630   }
631   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
632
633   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
634                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
635                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
636                                                        MPI_CURRENT_TYPE->encode()));
637
638   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
639   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
640
641   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
642   smpi_execute_flops(comp_size);
643
644   TRACE_smpi_comm_out(my_proc_id);
645   log_timed_action (action, clock);
646 }
647
648 static void action_allgather(simgrid::xbt::ReplayAction& action)
649 {
650   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
651         0 allGather 275427 275427
652     where:
653         1) 275427 is the sendcount
654         2) 275427 is the recvcount
655         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
656   */
657   double clock = smpi_process()->simulated_elapsed();
658
659   CHECK_ACTION_PARAMS(action, 2, 2)
660   int sendcount = std::stoi(action[2]);
661   int recvcount = std::stoi(action[3]);
662
663   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
664   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
665
666   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
667   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
668
669   int my_proc_id = Actor::self()->getPid();
670
671   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
672                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
673                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
674
675   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
676
677   TRACE_smpi_comm_out(my_proc_id);
678   log_timed_action (action, clock);
679 }
680
681 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
682 {
683   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
684         0 allGatherV 275427 275427 275427 275427 204020
685      where:
686         1) 275427 is the sendcount
687         2) The next four elements declare the recvcounts array
688         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
689   */
690   double clock = smpi_process()->simulated_elapsed();
691
692   unsigned long comm_size = MPI_COMM_WORLD->size();
693   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
694   int sendcount = std::stoi(action[2]);
695   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
696   std::vector<int> disps(comm_size, 0);
697
698   int datatype_index = 0, disp_index = 0;
699   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
700     datatype_index = 3 + comm_size;
701     disp_index     = datatype_index + 1;
702   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
703     datatype_index = -1;
704     disp_index     = 3 + comm_size;
705   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
706     datatype_index = 3 + comm_size;
707   }
708
709   if (disp_index != 0) {
710     for (unsigned int i = 0; i < comm_size; i++)
711       disps[i]          = std::stoi(action[disp_index + i]);
712   }
713
714   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
715   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
716
717   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
718
719   for (unsigned int i = 0; i < comm_size; i++) {
720     (*recvcounts)[i] = std::stoi(action[i + 3]);
721   }
722   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
723   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
724
725   int my_proc_id = Actor::self()->getPid();
726
727   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
728                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
729                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
730
731   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
732                     MPI_COMM_WORLD);
733
734   TRACE_smpi_comm_out(my_proc_id);
735   log_timed_action (action, clock);
736 }
737
738 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
739 {
740   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
741         0 allToAllV 100 1 7 10 12 100 1 70 10 5
742      where:
743         1) 100 is the size of the send buffer *sizeof(int),
744         2) 1 7 10 12 is the sendcounts array
745         3) 100*sizeof(int) is the size of the receiver buffer
746         4)  1 70 10 5 is the recvcounts array
747   */
748   double clock = smpi_process()->simulated_elapsed();
749
750   unsigned long comm_size = MPI_COMM_WORLD->size();
751   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
752   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
753   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
754   std::vector<int> senddisps(comm_size, 0);
755   std::vector<int> recvdisps(comm_size, 0);
756
757   MPI_Datatype MPI_CURRENT_TYPE =
758       (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
759   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
760                                                                      : MPI_DEFAULT_TYPE};
761
762   int send_buf_size=parse_double(action[2]);
763   int recv_buf_size=parse_double(action[3+comm_size]);
764   int my_proc_id = Actor::self()->getPid();
765   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
767
768   for (unsigned int i = 0; i < comm_size; i++) {
769     (*sendcounts)[i] = std::stoi(action[3 + i]);
770     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
771   }
772   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774
775   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
778
779   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
780                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
781
782   TRACE_smpi_comm_out(my_proc_id);
783   log_timed_action (action, clock);
784 }
785
786 }} // namespace simgrid::smpi
787
788 /** @brief Only initialize the replay, don't do it for real */
789 void smpi_replay_init(int* argc, char*** argv)
790 {
791   simgrid::smpi::Process::init(argc, argv);
792   smpi_process()->mark_as_initialized();
793   smpi_process()->set_replaying(true);
794
795   int my_proc_id = Actor::self()->getPid();
796   TRACE_smpi_init(my_proc_id);
797   TRACE_smpi_computing_init(my_proc_id);
798   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
799   TRACE_smpi_comm_out(my_proc_id);
800   xbt_replay_action_register("init",       simgrid::smpi::action_init);
801   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
802   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
803   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
804   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
805   xbt_replay_action_register("send",       simgrid::smpi::action_send);
806   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
807   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
808   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
809   xbt_replay_action_register("test",       simgrid::smpi::action_test);
810   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
811   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
812   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
813   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
814   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
815   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
816   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
817   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
818   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
819   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
820   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
821   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
822   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
823   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
824   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
825   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
826
827   //if we have a delayed start, sleep here.
828   if(*argc>2){
829     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
830     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
831     smpi_execute_flops(value);
832   } else {
833     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
834     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
835     smpi_execute_flops(0.0);
836   }
837 }
838
839 /** @brief actually run the replay after initialization */
840 void smpi_replay_main(int* argc, char*** argv)
841 {
842   simgrid::xbt::replay_runner(*argc, *argv);
843
844   /* and now, finalize everything */
845   /* One active process will stop. Decrease the counter*/
846   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
847   if (not get_reqq_self()->empty()) {
848     unsigned int count_requests=get_reqq_self()->size();
849     MPI_Request requests[count_requests];
850     MPI_Status status[count_requests];
851     unsigned int i=0;
852
853     for (auto const& req : *get_reqq_self()) {
854       requests[i] = req;
855       i++;
856     }
857     simgrid::smpi::Request::waitall(count_requests, requests, status);
858   }
859   delete get_reqq_self();
860   active_processes--;
861
862   if(active_processes==0){
863     /* Last process alive speaking: end the simulated timer */
864     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
865     smpi_free_replay_tmp_buffers();
866   }
867
868   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
869
870   smpi_process()->finalize();
871
872   TRACE_smpi_comm_out(Actor::self()->getPid());
873   TRACE_smpi_finalize(Actor::self()->getPid());
874 }
875
876 /** @brief chain a replay initialization and a replay start */
877 void smpi_replay_run(int* argc, char*** argv)
878 {
879   smpi_replay_init(argc, argv);
880   smpi_replay_main(argc, argv);
881 }