Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Remove class definition...
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65
66 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
67 static MPI_Datatype decode_datatype(std::string action)
68 {
69   return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
70 }
71
72 const char* encode_datatype(MPI_Datatype datatype)
73 {
74   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
75     return "-1";
76
77   return datatype->encode();
78 }
79
80 namespace simgrid {
81 namespace smpi {
82
83 static void action_init(simgrid::xbt::ReplayAction& action)
84 {
85   XBT_DEBUG("Initialize the counters");
86   CHECK_ACTION_PARAMS(action, 0, 1)
87   if (action.size() > 2)
88     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
89   else
90     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
91
92   /* start a simulated timer */
93   smpi_process()->simulated_start();
94   /*initialize the number of active processes */
95   active_processes = smpi_process_count();
96
97   set_reqq_self(new std::vector<MPI_Request>);
98 }
99
100 static void action_finalize(simgrid::xbt::ReplayAction& action)
101 {
102   /* Nothing to do */
103 }
104
105 static void action_comm_size(simgrid::xbt::ReplayAction& action)
106 {
107   communicator_size = parse_double(action[2]);
108   log_timed_action (action, smpi_process()->simulated_elapsed());
109 }
110
111 static void action_comm_split(simgrid::xbt::ReplayAction& action)
112 {
113   log_timed_action (action, smpi_process()->simulated_elapsed());
114 }
115
116 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
117 {
118   log_timed_action (action, smpi_process()->simulated_elapsed());
119 }
120
121 static void action_compute(simgrid::xbt::ReplayAction& action)
122 {
123   CHECK_ACTION_PARAMS(action, 1, 0)
124   double clock = smpi_process()->simulated_elapsed();
125   double flops= parse_double(action[2]);
126   int my_proc_id = Actor::self()->getPid();
127
128   TRACE_smpi_computing_in(my_proc_id, flops);
129   smpi_execute_flops(flops);
130   TRACE_smpi_computing_out(my_proc_id);
131
132   log_timed_action (action, clock);
133 }
134
135 static void action_send(simgrid::xbt::ReplayAction& action)
136 {
137   CHECK_ACTION_PARAMS(action, 2, 1)
138   int to       = std::stoi(action[2]);
139   double size=parse_double(action[3]);
140   double clock = smpi_process()->simulated_elapsed();
141
142   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
143
144   int my_proc_id = Actor::self()->getPid();
145   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
146
147   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
148                      new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
149   if (not TRACE_smpi_view_internals())
150     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
151
152   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
153
154   TRACE_smpi_comm_out(my_proc_id);
155
156   log_timed_action(action, clock);
157 }
158
159 static void action_Isend(simgrid::xbt::ReplayAction& action)
160 {
161   CHECK_ACTION_PARAMS(action, 2, 1)
162   int to       = std::stoi(action[2]);
163   double size=parse_double(action[3]);
164   double clock = smpi_process()->simulated_elapsed();
165
166   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
167
168   int my_proc_id = Actor::self()->getPid();
169   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
170   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
171                      new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
172   if (not TRACE_smpi_view_internals())
173     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
174
175   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
176
177   TRACE_smpi_comm_out(my_proc_id);
178
179   get_reqq_self()->push_back(request);
180
181   log_timed_action (action, clock);
182 }
183
184 static void action_recv(simgrid::xbt::ReplayAction& action)
185 {
186   CHECK_ACTION_PARAMS(action, 2, 1)
187   int from     = std::stoi(action[2]);
188   double size=parse_double(action[3]);
189   double clock = smpi_process()->simulated_elapsed();
190   MPI_Status status;
191
192   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
193
194   int my_proc_id = Actor::self()->getPid();
195   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
196
197   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
198                      new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
199
200   //unknown size from the receiver point of view
201   if (size <= 0.0) {
202     Request::probe(from, 0, MPI_COMM_WORLD, &status);
203     size=status.count;
204   }
205
206   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
207
208   TRACE_smpi_comm_out(my_proc_id);
209   if (not TRACE_smpi_view_internals()) {
210     TRACE_smpi_recv(src_traced, my_proc_id, 0);
211   }
212
213   log_timed_action (action, clock);
214 }
215
216 static void action_Irecv(simgrid::xbt::ReplayAction& action)
217 {
218   CHECK_ACTION_PARAMS(action, 2, 1)
219   int from     = std::stoi(action[2]);
220   double size=parse_double(action[3]);
221   double clock = smpi_process()->simulated_elapsed();
222
223   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
224
225   int my_proc_id = Actor::self()->getPid();
226   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
227                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
228   MPI_Status status;
229   //unknow size from the receiver pov
230   if (size <= 0.0) {
231     Request::probe(from, 0, MPI_COMM_WORLD, &status);
232     size = status.count;
233   }
234
235   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
236
237   TRACE_smpi_comm_out(my_proc_id);
238   get_reqq_self()->push_back(request);
239
240   log_timed_action (action, clock);
241 }
242
243 static void action_test(simgrid::xbt::ReplayAction& action)
244 {
245   CHECK_ACTION_PARAMS(action, 0, 0)
246   double clock = smpi_process()->simulated_elapsed();
247   MPI_Status status;
248
249   MPI_Request request = get_reqq_self()->back();
250   get_reqq_self()->pop_back();
251   //if request is null here, this may mean that a previous test has succeeded
252   //Different times in traced application and replayed version may lead to this
253   //In this case, ignore the extra calls.
254   if(request!=nullptr){
255     int my_proc_id = Actor::self()->getPid();
256     TRACE_smpi_testing_in(my_proc_id);
257
258     int flag = Request::test(&request, &status);
259
260     XBT_DEBUG("MPI_Test result: %d", flag);
261     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
262     get_reqq_self()->push_back(request);
263
264     TRACE_smpi_testing_out(my_proc_id);
265   }
266   log_timed_action (action, clock);
267 }
268
269 static void action_wait(simgrid::xbt::ReplayAction& action)
270 {
271   CHECK_ACTION_PARAMS(action, 0, 0)
272   double clock = smpi_process()->simulated_elapsed();
273   MPI_Status status;
274
275   std::string s = boost::algorithm::join(action, " ");
276   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
277   MPI_Request request = get_reqq_self()->back();
278   get_reqq_self()->pop_back();
279
280   if (request==nullptr){
281     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
282     return;
283   }
284
285   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
286
287   MPI_Group group = request->comm()->group();
288   int src_traced = group->rank(request->src());
289   int dst_traced = group->rank(request->dst());
290   int is_wait_for_receive = (request->flags() & RECV);
291   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
292
293   Request::wait(&request, &status);
294
295   TRACE_smpi_comm_out(rank);
296   if (is_wait_for_receive)
297     TRACE_smpi_recv(src_traced, dst_traced, 0);
298   log_timed_action (action, clock);
299 }
300
301 static void action_waitall(simgrid::xbt::ReplayAction& action)
302 {
303   CHECK_ACTION_PARAMS(action, 0, 0)
304   double clock = smpi_process()->simulated_elapsed();
305   const unsigned int count_requests = get_reqq_self()->size();
306
307   if (count_requests>0) {
308     MPI_Status status[count_requests];
309
310     int my_proc_id_traced = Actor::self()->getPid();
311     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
312                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
313     int recvs_snd[count_requests];
314     int recvs_rcv[count_requests];
315     for (unsigned int i = 0; i < count_requests; i++) {
316       const auto& req = (*get_reqq_self())[i];
317       if (req && (req->flags() & RECV)) {
318         recvs_snd[i] = req->src();
319         recvs_rcv[i] = req->dst();
320       } else
321         recvs_snd[i] = -100;
322    }
323    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
324
325    for (unsigned i = 0; i < count_requests; i++) {
326      if (recvs_snd[i]!=-100)
327        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
328    }
329    TRACE_smpi_comm_out(my_proc_id_traced);
330   }
331   log_timed_action (action, clock);
332 }
333
334 static void action_barrier(simgrid::xbt::ReplayAction& action)
335 {
336   double clock = smpi_process()->simulated_elapsed();
337   int my_proc_id = Actor::self()->getPid();
338   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
339
340   Colls::barrier(MPI_COMM_WORLD);
341
342   TRACE_smpi_comm_out(my_proc_id);
343   log_timed_action (action, clock);
344 }
345
346 static void action_bcast(simgrid::xbt::ReplayAction& action)
347 {
348   CHECK_ACTION_PARAMS(action, 1, 2)
349   double size = parse_double(action[2]);
350   double clock = smpi_process()->simulated_elapsed();
351   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
352   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
353   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
354
355   int my_proc_id = Actor::self()->getPid();
356   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
357                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
358                                                     -1, MPI_CURRENT_TYPE->encode(), ""));
359
360   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
361
362   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
363
364   TRACE_smpi_comm_out(my_proc_id);
365   log_timed_action (action, clock);
366 }
367
368 static void action_reduce(simgrid::xbt::ReplayAction& action)
369 {
370   CHECK_ACTION_PARAMS(action, 2, 2)
371   double comm_size = parse_double(action[2]);
372   double comp_size = parse_double(action[3]);
373   double clock = smpi_process()->simulated_elapsed();
374   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
375
376   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
377
378   int my_proc_id = Actor::self()->getPid();
379   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
380                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
381                                                     comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
382
383   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
384   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
385   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
386   smpi_execute_flops(comp_size);
387
388   TRACE_smpi_comm_out(my_proc_id);
389   log_timed_action (action, clock);
390 }
391
392 static void action_allReduce(simgrid::xbt::ReplayAction& action)
393 {
394   CHECK_ACTION_PARAMS(action, 2, 1)
395   double comm_size = parse_double(action[2]);
396   double comp_size = parse_double(action[3]);
397
398   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
399
400   double clock = smpi_process()->simulated_elapsed();
401   int my_proc_id = Actor::self()->getPid();
402   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
403                                                                               MPI_CURRENT_TYPE->encode(), ""));
404
405   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
406   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
407   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
408   smpi_execute_flops(comp_size);
409
410   TRACE_smpi_comm_out(my_proc_id);
411   log_timed_action (action, clock);
412 }
413
414 static void action_allToAll(simgrid::xbt::ReplayAction& action)
415 {
416   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
417   double clock = smpi_process()->simulated_elapsed();
418   unsigned long comm_size = MPI_COMM_WORLD->size();
419   int send_size = parse_double(action[2]);
420   int recv_size = parse_double(action[3]);
421   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
422   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
423
424   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
425   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
426
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
430                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
431
432   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
433
434   TRACE_smpi_comm_out(my_proc_id);
435   log_timed_action (action, clock);
436 }
437
438 static void action_gather(simgrid::xbt::ReplayAction& action)
439 {
440   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
441         0 gather 68 68 0 0 0
442       where:
443         1) 68 is the sendcounts
444         2) 68 is the recvcounts
445         3) 0 is the root node
446         4) 0 is the send datatype id, see decode_datatype()
447         5) 0 is the recv datatype id, see decode_datatype()
448   */
449   CHECK_ACTION_PARAMS(action, 2, 3)
450   double clock = smpi_process()->simulated_elapsed();
451   unsigned long comm_size = MPI_COMM_WORLD->size();
452   int send_size = parse_double(action[2]);
453   int recv_size = parse_double(action[3]);
454   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
455   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
456
457   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
458   void *recv = nullptr;
459   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
460   int rank = MPI_COMM_WORLD->rank();
461
462   if(rank==root)
463     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
464
465   TRACE_smpi_comm_in(rank, __FUNCTION__,
466                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
467                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
468
469   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
470
471   TRACE_smpi_comm_out(Actor::self()->getPid());
472   log_timed_action (action, clock);
473 }
474
475 static void action_scatter(simgrid::xbt::ReplayAction& action)
476 {
477   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
478         0 gather 68 68 0 0 0
479       where:
480         1) 68 is the sendcounts
481         2) 68 is the recvcounts
482         3) 0 is the root node
483         4) 0 is the send datatype id, see decode_datatype()
484         5) 0 is the recv datatype id, see decode_datatype()
485   */
486   CHECK_ACTION_PARAMS(action, 2, 3)
487   double clock                   = smpi_process()->simulated_elapsed();
488   unsigned long comm_size        = MPI_COMM_WORLD->size();
489   int send_size                  = parse_double(action[2]);
490   int recv_size                  = parse_double(action[3]);
491   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
493
494   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
495   void* recv = nullptr;
496   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
497   int rank = MPI_COMM_WORLD->rank();
498
499   if (rank == root)
500     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
501
502   TRACE_smpi_comm_in(rank, __FUNCTION__,
503                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
504                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
505
506   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
507
508   TRACE_smpi_comm_out(Actor::self()->getPid());
509   log_timed_action(action, clock);
510 }
511
512 static void action_gatherv(simgrid::xbt::ReplayAction& action)
513 {
514   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
515        0 gather 68 68 10 10 10 0 0 0
516      where:
517        1) 68 is the sendcount
518        2) 68 10 10 10 is the recvcounts
519        3) 0 is the root node
520        4) 0 is the send datatype id, see decode_datatype()
521        5) 0 is the recv datatype id, see decode_datatype()
522   */
523   double clock = smpi_process()->simulated_elapsed();
524   unsigned long comm_size = MPI_COMM_WORLD->size();
525   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
526   int send_size = parse_double(action[2]);
527   std::vector<int> disps(comm_size, 0);
528   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
529
530   MPI_Datatype MPI_CURRENT_TYPE =
531       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
532   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
533                                                                  : MPI_DEFAULT_TYPE};
534
535   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
536   void *recv = nullptr;
537   for (unsigned int i = 0; i < comm_size; i++) {
538     (*recvcounts)[i] = std::stoi(action[i + 3]);
539   }
540   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
541
542   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
543   int rank = MPI_COMM_WORLD->rank();
544
545   if(rank==root)
546     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
547
548   TRACE_smpi_comm_in(rank, __FUNCTION__,
549                      new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
550                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
551
552   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
553                  MPI_COMM_WORLD);
554
555   TRACE_smpi_comm_out(Actor::self()->getPid());
556   log_timed_action (action, clock);
557 }
558
559 static void action_scatterv(simgrid::xbt::ReplayAction& action)
560 {
561   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
562        0 gather 68 10 10 10 68 0 0 0
563      where:
564        1) 68 10 10 10 is the sendcounts
565        2) 68 is the recvcount
566        3) 0 is the root node
567        4) 0 is the send datatype id, see decode_datatype()
568        5) 0 is the recv datatype id, see decode_datatype()
569   */
570   double clock  = smpi_process()->simulated_elapsed();
571   unsigned long comm_size = MPI_COMM_WORLD->size();
572   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
573   int recv_size = parse_double(action[2 + comm_size]);
574   std::vector<int> disps(comm_size, 0);
575   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
576
577   MPI_Datatype MPI_CURRENT_TYPE =
578       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
579   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
580                                                                  : MPI_DEFAULT_TYPE};
581
582   void* send = nullptr;
583   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
584   for (unsigned int i = 0; i < comm_size; i++) {
585     (*sendcounts)[i] = std::stoi(action[i + 2]);
586   }
587   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
588
589   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
590   int rank = MPI_COMM_WORLD->rank();
591
592   if (rank == root)
593     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
594
595   TRACE_smpi_comm_in(rank, __FUNCTION__,
596                      new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
597                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
598
599   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
600                   MPI_COMM_WORLD);
601
602   TRACE_smpi_comm_out(Actor::self()->getPid());
603   log_timed_action(action, clock);
604 }
605
606 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
607 {
608   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
609        0 reduceScatter 275427 275427 275427 204020 11346849 0
610      where:
611        1) The first four values after the name of the action declare the recvcounts array
612        2) The value 11346849 is the amount of instructions
613        3) The last value corresponds to the datatype, see decode_datatype().
614  */
615   double clock = smpi_process()->simulated_elapsed();
616   unsigned long comm_size = MPI_COMM_WORLD->size();
617   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
618   int comp_size = parse_double(action[2+comm_size]);
619   int my_proc_id                     = Actor::self()->getPid();
620   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
621   MPI_Datatype MPI_CURRENT_TYPE =
622       (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
623
624   for (unsigned int i = 0; i < comm_size; i++) {
625     recvcounts->push_back(std::stoi(action[i + 2]));
626   }
627   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
628
629   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
630                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
631                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
632                                                        MPI_CURRENT_TYPE->encode()));
633
634   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
635   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
636
637   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
638   smpi_execute_flops(comp_size);
639
640   TRACE_smpi_comm_out(my_proc_id);
641   log_timed_action (action, clock);
642 }
643
644 static void action_allgather(simgrid::xbt::ReplayAction& action)
645 {
646   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
647         0 allGather 275427 275427
648     where:
649         1) 275427 is the sendcount
650         2) 275427 is the recvcount
651         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
652   */
653   double clock = smpi_process()->simulated_elapsed();
654
655   CHECK_ACTION_PARAMS(action, 2, 2)
656   int sendcount = std::stoi(action[2]);
657   int recvcount = std::stoi(action[3]);
658
659   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
660   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
661
662   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
663   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
664
665   int my_proc_id = Actor::self()->getPid();
666
667   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
668                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
669                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
670
671   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
672
673   TRACE_smpi_comm_out(my_proc_id);
674   log_timed_action (action, clock);
675 }
676
677 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
678 {
679   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
680         0 allGatherV 275427 275427 275427 275427 204020
681      where:
682         1) 275427 is the sendcount
683         2) The next four elements declare the recvcounts array
684         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
685   */
686   double clock = smpi_process()->simulated_elapsed();
687
688   unsigned long comm_size = MPI_COMM_WORLD->size();
689   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
690   int sendcount = std::stoi(action[2]);
691   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
692   std::vector<int> disps(comm_size, 0);
693
694   int datatype_index = 0, disp_index = 0;
695   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
696     datatype_index = 3 + comm_size;
697     disp_index     = datatype_index + 1;
698   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
699     datatype_index = -1;
700     disp_index     = 3 + comm_size;
701   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
702     datatype_index = 3 + comm_size;
703   }
704
705   if (disp_index != 0) {
706     for (unsigned int i = 0; i < comm_size; i++)
707       disps[i]          = std::stoi(action[disp_index + i]);
708   }
709
710   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
711   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
712
713   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
714
715   for (unsigned int i = 0; i < comm_size; i++) {
716     (*recvcounts)[i] = std::stoi(action[i + 3]);
717   }
718   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
719   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
720
721   int my_proc_id = Actor::self()->getPid();
722
723   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
724                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
725                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
726
727   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
728                     MPI_COMM_WORLD);
729
730   TRACE_smpi_comm_out(my_proc_id);
731   log_timed_action (action, clock);
732 }
733
734 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
735 {
736   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
737         0 allToAllV 100 1 7 10 12 100 1 70 10 5
738      where:
739         1) 100 is the size of the send buffer *sizeof(int),
740         2) 1 7 10 12 is the sendcounts array
741         3) 100*sizeof(int) is the size of the receiver buffer
742         4)  1 70 10 5 is the recvcounts array
743   */
744   double clock = smpi_process()->simulated_elapsed();
745
746   unsigned long comm_size = MPI_COMM_WORLD->size();
747   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
748   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
749   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
750   std::vector<int> senddisps(comm_size, 0);
751   std::vector<int> recvdisps(comm_size, 0);
752
753   MPI_Datatype MPI_CURRENT_TYPE =
754       (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
755   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
756                                                                      : MPI_DEFAULT_TYPE};
757
758   int send_buf_size=parse_double(action[2]);
759   int recv_buf_size=parse_double(action[3+comm_size]);
760   int my_proc_id = Actor::self()->getPid();
761   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
762   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
763
764   for (unsigned int i = 0; i < comm_size; i++) {
765     (*sendcounts)[i] = std::stoi(action[3 + i]);
766     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
767   }
768   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
769   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
770
771   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
772                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
773                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
774
775   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
776                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
777
778   TRACE_smpi_comm_out(my_proc_id);
779   log_timed_action (action, clock);
780 }
781
782 }} // namespace simgrid::smpi
783
784 /** @brief Only initialize the replay, don't do it for real */
785 void smpi_replay_init(int* argc, char*** argv)
786 {
787   simgrid::smpi::Process::init(argc, argv);
788   smpi_process()->mark_as_initialized();
789   smpi_process()->set_replaying(true);
790
791   int my_proc_id = Actor::self()->getPid();
792   TRACE_smpi_init(my_proc_id);
793   TRACE_smpi_computing_init(my_proc_id);
794   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
795   TRACE_smpi_comm_out(my_proc_id);
796   xbt_replay_action_register("init",       simgrid::smpi::action_init);
797   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
798   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
799   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
800   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
801   xbt_replay_action_register("send",       simgrid::smpi::action_send);
802   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
803   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
804   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
805   xbt_replay_action_register("test",       simgrid::smpi::action_test);
806   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
807   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
808   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
809   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
810   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
811   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
812   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
813   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
814   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
815   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
816   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
817   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
818   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
819   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
820   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
821   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
822
823   //if we have a delayed start, sleep here.
824   if(*argc>2){
825     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
826     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
827     smpi_execute_flops(value);
828   } else {
829     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
830     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
831     smpi_execute_flops(0.0);
832   }
833 }
834
835 /** @brief actually run the replay after initialization */
836 void smpi_replay_main(int* argc, char*** argv)
837 {
838   simgrid::xbt::replay_runner(*argc, *argv);
839
840   /* and now, finalize everything */
841   /* One active process will stop. Decrease the counter*/
842   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
843   if (not get_reqq_self()->empty()) {
844     unsigned int count_requests=get_reqq_self()->size();
845     MPI_Request requests[count_requests];
846     MPI_Status status[count_requests];
847     unsigned int i=0;
848
849     for (auto const& req : *get_reqq_self()) {
850       requests[i] = req;
851       i++;
852     }
853     simgrid::smpi::Request::waitall(count_requests, requests, status);
854   }
855   delete get_reqq_self();
856   active_processes--;
857
858   if(active_processes==0){
859     /* Last process alive speaking: end the simulated timer */
860     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
861     smpi_free_replay_tmp_buffers();
862   }
863
864   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
865
866   smpi_process()->finalize();
867
868   TRACE_smpi_comm_out(Actor::self()->getPid());
869   TRACE_smpi_finalize(Actor::self()->getPid());
870 }
871
872 /** @brief chain a replay initialization and a replay start */
873 void smpi_replay_run(int* argc, char*** argv)
874 {
875   smpi_replay_init(argc, argv);
876   smpi_replay_main(argc, argv);
877 }