Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Datatype: Remove all encode_datatype calls
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
66 static MPI_Datatype decode_datatype(std::string action)
67 {
68   return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
69 }
70
71 namespace simgrid {
72 namespace smpi {
73
74 static void action_init(simgrid::xbt::ReplayAction& action)
75 {
76   XBT_DEBUG("Initialize the counters");
77   CHECK_ACTION_PARAMS(action, 0, 1)
78   if (action.size() > 2)
79     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
80   else
81     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
82
83   /* start a simulated timer */
84   smpi_process()->simulated_start();
85   /*initialize the number of active processes */
86   active_processes = smpi_process_count();
87
88   set_reqq_self(new std::vector<MPI_Request>);
89 }
90
91 static void action_finalize(simgrid::xbt::ReplayAction& action)
92 {
93   /* Nothing to do */
94 }
95
96 static void action_comm_size(simgrid::xbt::ReplayAction& action)
97 {
98   communicator_size = parse_double(action[2]);
99   log_timed_action (action, smpi_process()->simulated_elapsed());
100 }
101
102 static void action_comm_split(simgrid::xbt::ReplayAction& action)
103 {
104   log_timed_action (action, smpi_process()->simulated_elapsed());
105 }
106
107 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
108 {
109   log_timed_action (action, smpi_process()->simulated_elapsed());
110 }
111
112 static void action_compute(simgrid::xbt::ReplayAction& action)
113 {
114   CHECK_ACTION_PARAMS(action, 1, 0)
115   double clock = smpi_process()->simulated_elapsed();
116   double flops= parse_double(action[2]);
117   int my_proc_id = Actor::self()->getPid();
118
119   TRACE_smpi_computing_in(my_proc_id, flops);
120   smpi_execute_flops(flops);
121   TRACE_smpi_computing_out(my_proc_id);
122
123   log_timed_action (action, clock);
124 }
125
126 static void action_send(simgrid::xbt::ReplayAction& action)
127 {
128   CHECK_ACTION_PARAMS(action, 2, 1)
129   int to       = std::stoi(action[2]);
130   double size=parse_double(action[3]);
131   double clock = smpi_process()->simulated_elapsed();
132
133   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
134
135   int my_proc_id = Actor::self()->getPid();
136   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
137
138   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
139                      new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
140   if (not TRACE_smpi_view_internals())
141     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
142
143   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
144
145   TRACE_smpi_comm_out(my_proc_id);
146
147   log_timed_action(action, clock);
148 }
149
150 static void action_Isend(simgrid::xbt::ReplayAction& action)
151 {
152   CHECK_ACTION_PARAMS(action, 2, 1)
153   int to       = std::stoi(action[2]);
154   double size=parse_double(action[3]);
155   double clock = smpi_process()->simulated_elapsed();
156
157   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
158
159   int my_proc_id = Actor::self()->getPid();
160   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
161   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
162                      new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
163   if (not TRACE_smpi_view_internals())
164     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
165
166   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
167
168   TRACE_smpi_comm_out(my_proc_id);
169
170   get_reqq_self()->push_back(request);
171
172   log_timed_action (action, clock);
173 }
174
175 static void action_recv(simgrid::xbt::ReplayAction& action)
176 {
177   CHECK_ACTION_PARAMS(action, 2, 1)
178   int from     = std::stoi(action[2]);
179   double size=parse_double(action[3]);
180   double clock = smpi_process()->simulated_elapsed();
181   MPI_Status status;
182
183   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
184
185   int my_proc_id = Actor::self()->getPid();
186   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
187
188   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189                      new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
190
191   //unknown size from the receiver point of view
192   if (size <= 0.0) {
193     Request::probe(from, 0, MPI_COMM_WORLD, &status);
194     size=status.count;
195   }
196
197   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
198
199   TRACE_smpi_comm_out(my_proc_id);
200   if (not TRACE_smpi_view_internals()) {
201     TRACE_smpi_recv(src_traced, my_proc_id, 0);
202   }
203
204   log_timed_action (action, clock);
205 }
206
207 static void action_Irecv(simgrid::xbt::ReplayAction& action)
208 {
209   CHECK_ACTION_PARAMS(action, 2, 1)
210   int from     = std::stoi(action[2]);
211   double size=parse_double(action[3]);
212   double clock = smpi_process()->simulated_elapsed();
213
214   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
215
216   int my_proc_id = Actor::self()->getPid();
217   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
218                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
219   MPI_Status status;
220   //unknow size from the receiver pov
221   if (size <= 0.0) {
222     Request::probe(from, 0, MPI_COMM_WORLD, &status);
223     size = status.count;
224   }
225
226   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
227
228   TRACE_smpi_comm_out(my_proc_id);
229   get_reqq_self()->push_back(request);
230
231   log_timed_action (action, clock);
232 }
233
234 static void action_test(simgrid::xbt::ReplayAction& action)
235 {
236   CHECK_ACTION_PARAMS(action, 0, 0)
237   double clock = smpi_process()->simulated_elapsed();
238   MPI_Status status;
239
240   MPI_Request request = get_reqq_self()->back();
241   get_reqq_self()->pop_back();
242   //if request is null here, this may mean that a previous test has succeeded
243   //Different times in traced application and replayed version may lead to this
244   //In this case, ignore the extra calls.
245   if(request!=nullptr){
246     int my_proc_id = Actor::self()->getPid();
247     TRACE_smpi_testing_in(my_proc_id);
248
249     int flag = Request::test(&request, &status);
250
251     XBT_DEBUG("MPI_Test result: %d", flag);
252     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
253     get_reqq_self()->push_back(request);
254
255     TRACE_smpi_testing_out(my_proc_id);
256   }
257   log_timed_action (action, clock);
258 }
259
260 static void action_wait(simgrid::xbt::ReplayAction& action)
261 {
262   CHECK_ACTION_PARAMS(action, 0, 0)
263   double clock = smpi_process()->simulated_elapsed();
264   MPI_Status status;
265
266   std::string s = boost::algorithm::join(action, " ");
267   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
268   MPI_Request request = get_reqq_self()->back();
269   get_reqq_self()->pop_back();
270
271   if (request==nullptr){
272     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
273     return;
274   }
275
276   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
277
278   MPI_Group group = request->comm()->group();
279   int src_traced = group->rank(request->src());
280   int dst_traced = group->rank(request->dst());
281   int is_wait_for_receive = (request->flags() & RECV);
282   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
283
284   Request::wait(&request, &status);
285
286   TRACE_smpi_comm_out(rank);
287   if (is_wait_for_receive)
288     TRACE_smpi_recv(src_traced, dst_traced, 0);
289   log_timed_action (action, clock);
290 }
291
292 static void action_waitall(simgrid::xbt::ReplayAction& action)
293 {
294   CHECK_ACTION_PARAMS(action, 0, 0)
295   double clock = smpi_process()->simulated_elapsed();
296   const unsigned int count_requests = get_reqq_self()->size();
297
298   if (count_requests>0) {
299     MPI_Status status[count_requests];
300
301     int my_proc_id_traced = Actor::self()->getPid();
302     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
303                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
304     int recvs_snd[count_requests];
305     int recvs_rcv[count_requests];
306     for (unsigned int i = 0; i < count_requests; i++) {
307       const auto& req = (*get_reqq_self())[i];
308       if (req && (req->flags() & RECV)) {
309         recvs_snd[i] = req->src();
310         recvs_rcv[i] = req->dst();
311       } else
312         recvs_snd[i] = -100;
313    }
314    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
315
316    for (unsigned i = 0; i < count_requests; i++) {
317      if (recvs_snd[i]!=-100)
318        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
319    }
320    TRACE_smpi_comm_out(my_proc_id_traced);
321   }
322   log_timed_action (action, clock);
323 }
324
325 static void action_barrier(simgrid::xbt::ReplayAction& action)
326 {
327   double clock = smpi_process()->simulated_elapsed();
328   int my_proc_id = Actor::self()->getPid();
329   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
330
331   Colls::barrier(MPI_COMM_WORLD);
332
333   TRACE_smpi_comm_out(my_proc_id);
334   log_timed_action (action, clock);
335 }
336
337 static void action_bcast(simgrid::xbt::ReplayAction& action)
338 {
339   CHECK_ACTION_PARAMS(action, 1, 2)
340   double size = parse_double(action[2]);
341   double clock = smpi_process()->simulated_elapsed();
342   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
343   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
344   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
345
346   int my_proc_id = Actor::self()->getPid();
347   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
348                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
349                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
350
351   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
352
353   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
354
355   TRACE_smpi_comm_out(my_proc_id);
356   log_timed_action (action, clock);
357 }
358
359 static void action_reduce(simgrid::xbt::ReplayAction& action)
360 {
361   CHECK_ACTION_PARAMS(action, 2, 2)
362   double comm_size = parse_double(action[2]);
363   double comp_size = parse_double(action[3]);
364   double clock = smpi_process()->simulated_elapsed();
365   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
366
367   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
368
369   int my_proc_id = Actor::self()->getPid();
370   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
371                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
372                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
373
374   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
375   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
376   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
377   smpi_execute_flops(comp_size);
378
379   TRACE_smpi_comm_out(my_proc_id);
380   log_timed_action (action, clock);
381 }
382
383 static void action_allReduce(simgrid::xbt::ReplayAction& action)
384 {
385   CHECK_ACTION_PARAMS(action, 2, 1)
386   double comm_size = parse_double(action[2]);
387   double comp_size = parse_double(action[3]);
388
389   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
390
391   double clock = smpi_process()->simulated_elapsed();
392   int my_proc_id = Actor::self()->getPid();
393   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
394                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
395
396   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
397   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
398   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
399   smpi_execute_flops(comp_size);
400
401   TRACE_smpi_comm_out(my_proc_id);
402   log_timed_action (action, clock);
403 }
404
405 static void action_allToAll(simgrid::xbt::ReplayAction& action)
406 {
407   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
408   double clock = smpi_process()->simulated_elapsed();
409   unsigned long comm_size = MPI_COMM_WORLD->size();
410   int send_size = parse_double(action[2]);
411   int recv_size = parse_double(action[3]);
412   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
413   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
414
415   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
416   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
417
418   int my_proc_id = Actor::self()->getPid();
419   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
420                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
421                                                     Datatype::encode(MPI_CURRENT_TYPE),
422                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
423
424   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
425
426   TRACE_smpi_comm_out(my_proc_id);
427   log_timed_action (action, clock);
428 }
429
430 static void action_gather(simgrid::xbt::ReplayAction& action)
431 {
432   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
433         0 gather 68 68 0 0 0
434       where:
435         1) 68 is the sendcounts
436         2) 68 is the recvcounts
437         3) 0 is the root node
438         4) 0 is the send datatype id, see decode_datatype()
439         5) 0 is the recv datatype id, see decode_datatype()
440   */
441   CHECK_ACTION_PARAMS(action, 2, 3)
442   double clock = smpi_process()->simulated_elapsed();
443   unsigned long comm_size = MPI_COMM_WORLD->size();
444   int send_size = parse_double(action[2]);
445   int recv_size = parse_double(action[3]);
446   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
447   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
448
449   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
450   void *recv = nullptr;
451   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
452   int rank = MPI_COMM_WORLD->rank();
453
454   if(rank==root)
455     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
456
457   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
458                                                                         Datatype::encode(MPI_CURRENT_TYPE),
459                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
460
461   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
462
463   TRACE_smpi_comm_out(Actor::self()->getPid());
464   log_timed_action (action, clock);
465 }
466
467 static void action_scatter(simgrid::xbt::ReplayAction& action)
468 {
469   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
470         0 gather 68 68 0 0 0
471       where:
472         1) 68 is the sendcounts
473         2) 68 is the recvcounts
474         3) 0 is the root node
475         4) 0 is the send datatype id, see decode_datatype()
476         5) 0 is the recv datatype id, see decode_datatype()
477   */
478   CHECK_ACTION_PARAMS(action, 2, 3)
479   double clock                   = smpi_process()->simulated_elapsed();
480   unsigned long comm_size        = MPI_COMM_WORLD->size();
481   int send_size                  = parse_double(action[2]);
482   int recv_size                  = parse_double(action[3]);
483   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
484   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
485
486   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
487   void* recv = nullptr;
488   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
489   int rank = MPI_COMM_WORLD->rank();
490
491   if (rank == root)
492     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
493
494   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
495                                                                         Datatype::encode(MPI_CURRENT_TYPE),
496                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
497
498   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
499
500   TRACE_smpi_comm_out(Actor::self()->getPid());
501   log_timed_action(action, clock);
502 }
503
504 static void action_gatherv(simgrid::xbt::ReplayAction& action)
505 {
506   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
507        0 gather 68 68 10 10 10 0 0 0
508      where:
509        1) 68 is the sendcount
510        2) 68 10 10 10 is the recvcounts
511        3) 0 is the root node
512        4) 0 is the send datatype id, see decode_datatype()
513        5) 0 is the recv datatype id, see decode_datatype()
514   */
515   double clock = smpi_process()->simulated_elapsed();
516   unsigned long comm_size = MPI_COMM_WORLD->size();
517   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
518   int send_size = parse_double(action[2]);
519   std::vector<int> disps(comm_size, 0);
520   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
521
522   MPI_Datatype MPI_CURRENT_TYPE =
523       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
524   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
525                                                                  : MPI_DEFAULT_TYPE};
526
527   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528   void *recv = nullptr;
529   for (unsigned int i = 0; i < comm_size; i++) {
530     (*recvcounts)[i] = std::stoi(action[i + 3]);
531   }
532   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
533
534   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
535   int rank = MPI_COMM_WORLD->rank();
536
537   if(rank==root)
538     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
539
540   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
541                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
542                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
543
544   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
545                  MPI_COMM_WORLD);
546
547   TRACE_smpi_comm_out(Actor::self()->getPid());
548   log_timed_action (action, clock);
549 }
550
551 static void action_scatterv(simgrid::xbt::ReplayAction& action)
552 {
553   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
554        0 gather 68 10 10 10 68 0 0 0
555      where:
556        1) 68 10 10 10 is the sendcounts
557        2) 68 is the recvcount
558        3) 0 is the root node
559        4) 0 is the send datatype id, see decode_datatype()
560        5) 0 is the recv datatype id, see decode_datatype()
561   */
562   double clock  = smpi_process()->simulated_elapsed();
563   unsigned long comm_size = MPI_COMM_WORLD->size();
564   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
565   int recv_size = parse_double(action[2 + comm_size]);
566   std::vector<int> disps(comm_size, 0);
567   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
568
569   MPI_Datatype MPI_CURRENT_TYPE =
570       (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
571   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
572                                                                  : MPI_DEFAULT_TYPE};
573
574   void* send = nullptr;
575   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
576   for (unsigned int i = 0; i < comm_size; i++) {
577     (*sendcounts)[i] = std::stoi(action[i + 2]);
578   }
579   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
580
581   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
582   int rank = MPI_COMM_WORLD->rank();
583
584   if (rank == root)
585     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
586
587   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
588                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
589                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
590
591   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
592                   MPI_COMM_WORLD);
593
594   TRACE_smpi_comm_out(Actor::self()->getPid());
595   log_timed_action(action, clock);
596 }
597
598 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
599 {
600   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
601        0 reduceScatter 275427 275427 275427 204020 11346849 0
602      where:
603        1) The first four values after the name of the action declare the recvcounts array
604        2) The value 11346849 is the amount of instructions
605        3) The last value corresponds to the datatype, see decode_datatype().
606  */
607   double clock = smpi_process()->simulated_elapsed();
608   unsigned long comm_size = MPI_COMM_WORLD->size();
609   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
610   int comp_size = parse_double(action[2+comm_size]);
611   int my_proc_id                     = Actor::self()->getPid();
612   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
613   MPI_Datatype MPI_CURRENT_TYPE =
614       (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
615
616   for (unsigned int i = 0; i < comm_size; i++) {
617     recvcounts->push_back(std::stoi(action[i + 2]));
618   }
619   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
620
621   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
622                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
623                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
624                                                        Datatype::encode(MPI_CURRENT_TYPE)));
625
626   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
627   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
628
629   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
630   smpi_execute_flops(comp_size);
631
632   TRACE_smpi_comm_out(my_proc_id);
633   log_timed_action (action, clock);
634 }
635
636 static void action_allgather(simgrid::xbt::ReplayAction& action)
637 {
638   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
639         0 allGather 275427 275427
640     where:
641         1) 275427 is the sendcount
642         2) 275427 is the recvcount
643         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
644   */
645   double clock = smpi_process()->simulated_elapsed();
646
647   CHECK_ACTION_PARAMS(action, 2, 2)
648   int sendcount = std::stoi(action[2]);
649   int recvcount = std::stoi(action[3]);
650
651   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
652   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
653
654   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
655   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
656
657   int my_proc_id = Actor::self()->getPid();
658
659   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
660                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
661                                                     Datatype::encode(MPI_CURRENT_TYPE),
662                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
663
664   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
665
666   TRACE_smpi_comm_out(my_proc_id);
667   log_timed_action (action, clock);
668 }
669
670 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
671 {
672   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
673         0 allGatherV 275427 275427 275427 275427 204020
674      where:
675         1) 275427 is the sendcount
676         2) The next four elements declare the recvcounts array
677         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
678   */
679   double clock = smpi_process()->simulated_elapsed();
680
681   unsigned long comm_size = MPI_COMM_WORLD->size();
682   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
683   int sendcount = std::stoi(action[2]);
684   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
685   std::vector<int> disps(comm_size, 0);
686
687   int datatype_index = 0, disp_index = 0;
688   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
689     datatype_index = 3 + comm_size;
690     disp_index     = datatype_index + 1;
691   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
692     datatype_index = -1;
693     disp_index     = 3 + comm_size;
694   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
695     datatype_index = 3 + comm_size;
696   }
697
698   if (disp_index != 0) {
699     for (unsigned int i = 0; i < comm_size; i++)
700       disps[i]          = std::stoi(action[disp_index + i]);
701   }
702
703   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
704   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
705
706   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
707
708   for (unsigned int i = 0; i < comm_size; i++) {
709     (*recvcounts)[i] = std::stoi(action[i + 3]);
710   }
711   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
712   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
713
714   int my_proc_id = Actor::self()->getPid();
715
716   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
717                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
718                                                        Datatype::encode(MPI_CURRENT_TYPE),
719                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
720
721   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
722                     MPI_COMM_WORLD);
723
724   TRACE_smpi_comm_out(my_proc_id);
725   log_timed_action (action, clock);
726 }
727
728 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
729 {
730   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
731         0 allToAllV 100 1 7 10 12 100 1 70 10 5
732      where:
733         1) 100 is the size of the send buffer *sizeof(int),
734         2) 1 7 10 12 is the sendcounts array
735         3) 100*sizeof(int) is the size of the receiver buffer
736         4)  1 70 10 5 is the recvcounts array
737   */
738   double clock = smpi_process()->simulated_elapsed();
739
740   unsigned long comm_size = MPI_COMM_WORLD->size();
741   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
742   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
743   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
744   std::vector<int> senddisps(comm_size, 0);
745   std::vector<int> recvdisps(comm_size, 0);
746
747   MPI_Datatype MPI_CURRENT_TYPE =
748       (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
749   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
750                                                                      : MPI_DEFAULT_TYPE};
751
752   int send_buf_size=parse_double(action[2]);
753   int recv_buf_size=parse_double(action[3+comm_size]);
754   int my_proc_id = Actor::self()->getPid();
755   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
757
758   for (unsigned int i = 0; i < comm_size; i++) {
759     (*sendcounts)[i] = std::stoi(action[3 + i]);
760     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
761   }
762   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
764
765   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767                                                        Datatype::encode(MPI_CURRENT_TYPE),
768                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
769
770   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
772
773   TRACE_smpi_comm_out(my_proc_id);
774   log_timed_action (action, clock);
775 }
776
777 }} // namespace simgrid::smpi
778
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
781 {
782   simgrid::smpi::Process::init(argc, argv);
783   smpi_process()->mark_as_initialized();
784   smpi_process()->set_replaying(true);
785
786   int my_proc_id = Actor::self()->getPid();
787   TRACE_smpi_init(my_proc_id);
788   TRACE_smpi_computing_init(my_proc_id);
789   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790   TRACE_smpi_comm_out(my_proc_id);
791   xbt_replay_action_register("init",       simgrid::smpi::action_init);
792   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
793   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
794   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
795   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
796   xbt_replay_action_register("send",       simgrid::smpi::action_send);
797   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
798   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
799   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
800   xbt_replay_action_register("test",       simgrid::smpi::action_test);
801   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
802   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
803   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
804   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
805   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
806   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
807   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
808   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
809   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
810   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
812   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
814   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
816   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
817
818   //if we have a delayed start, sleep here.
819   if(*argc>2){
820     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822     smpi_execute_flops(value);
823   } else {
824     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
826     smpi_execute_flops(0.0);
827   }
828 }
829
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
832 {
833   simgrid::xbt::replay_runner(*argc, *argv);
834
835   /* and now, finalize everything */
836   /* One active process will stop. Decrease the counter*/
837   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838   if (not get_reqq_self()->empty()) {
839     unsigned int count_requests=get_reqq_self()->size();
840     MPI_Request requests[count_requests];
841     MPI_Status status[count_requests];
842     unsigned int i=0;
843
844     for (auto const& req : *get_reqq_self()) {
845       requests[i] = req;
846       i++;
847     }
848     simgrid::smpi::Request::waitall(count_requests, requests, status);
849   }
850   delete get_reqq_self();
851   active_processes--;
852
853   if(active_processes==0){
854     /* Last process alive speaking: end the simulated timer */
855     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856     smpi_free_replay_tmp_buffers();
857   }
858
859   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
860
861   smpi_process()->finalize();
862
863   TRACE_smpi_comm_out(Actor::self()->getPid());
864   TRACE_smpi_finalize(Actor::self()->getPid());
865 }
866
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
869 {
870   smpi_replay_init(argc, argv);
871   smpi_replay_main(argc, argv);
872 }