Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Remove all references to decode_datatype
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 static void action_init(simgrid::xbt::ReplayAction& action)
69 {
70   XBT_DEBUG("Initialize the counters");
71   CHECK_ACTION_PARAMS(action, 0, 1)
72   if (action.size() > 2)
73     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
74   else
75     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
76
77   /* start a simulated timer */
78   smpi_process()->simulated_start();
79   /*initialize the number of active processes */
80   active_processes = smpi_process_count();
81
82   set_reqq_self(new std::vector<MPI_Request>);
83 }
84
85 static void action_finalize(simgrid::xbt::ReplayAction& action)
86 {
87   /* Nothing to do */
88 }
89
90 static void action_comm_size(simgrid::xbt::ReplayAction& action)
91 {
92   communicator_size = parse_double(action[2]);
93   log_timed_action (action, smpi_process()->simulated_elapsed());
94 }
95
96 static void action_comm_split(simgrid::xbt::ReplayAction& action)
97 {
98   log_timed_action (action, smpi_process()->simulated_elapsed());
99 }
100
101 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
102 {
103   log_timed_action (action, smpi_process()->simulated_elapsed());
104 }
105
106 static void action_compute(simgrid::xbt::ReplayAction& action)
107 {
108   CHECK_ACTION_PARAMS(action, 1, 0)
109   double clock = smpi_process()->simulated_elapsed();
110   double flops= parse_double(action[2]);
111   int my_proc_id = Actor::self()->getPid();
112
113   TRACE_smpi_computing_in(my_proc_id, flops);
114   smpi_execute_flops(flops);
115   TRACE_smpi_computing_out(my_proc_id);
116
117   log_timed_action (action, clock);
118 }
119
120 static void action_send(simgrid::xbt::ReplayAction& action)
121 {
122   CHECK_ACTION_PARAMS(action, 2, 1)
123   int to       = std::stoi(action[2]);
124   double size=parse_double(action[3]);
125   double clock = smpi_process()->simulated_elapsed();
126
127   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
128
129   int my_proc_id = Actor::self()->getPid();
130   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
131
132   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
133                      new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
134   if (not TRACE_smpi_view_internals())
135     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
136
137   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
138
139   TRACE_smpi_comm_out(my_proc_id);
140
141   log_timed_action(action, clock);
142 }
143
144 static void action_Isend(simgrid::xbt::ReplayAction& action)
145 {
146   CHECK_ACTION_PARAMS(action, 2, 1)
147   int to       = std::stoi(action[2]);
148   double size=parse_double(action[3]);
149   double clock = smpi_process()->simulated_elapsed();
150
151   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
152
153   int my_proc_id = Actor::self()->getPid();
154   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
155   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
156                      new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
157   if (not TRACE_smpi_view_internals())
158     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
159
160   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
161
162   TRACE_smpi_comm_out(my_proc_id);
163
164   get_reqq_self()->push_back(request);
165
166   log_timed_action (action, clock);
167 }
168
169 static void action_recv(simgrid::xbt::ReplayAction& action)
170 {
171   CHECK_ACTION_PARAMS(action, 2, 1)
172   int from     = std::stoi(action[2]);
173   double size=parse_double(action[3]);
174   double clock = smpi_process()->simulated_elapsed();
175   MPI_Status status;
176
177   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
178
179   int my_proc_id = Actor::self()->getPid();
180   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
181
182   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
183                      new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
184
185   //unknown size from the receiver point of view
186   if (size <= 0.0) {
187     Request::probe(from, 0, MPI_COMM_WORLD, &status);
188     size=status.count;
189   }
190
191   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
192
193   TRACE_smpi_comm_out(my_proc_id);
194   if (not TRACE_smpi_view_internals()) {
195     TRACE_smpi_recv(src_traced, my_proc_id, 0);
196   }
197
198   log_timed_action (action, clock);
199 }
200
201 static void action_Irecv(simgrid::xbt::ReplayAction& action)
202 {
203   CHECK_ACTION_PARAMS(action, 2, 1)
204   int from     = std::stoi(action[2]);
205   double size=parse_double(action[3]);
206   double clock = smpi_process()->simulated_elapsed();
207
208   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
209
210   int my_proc_id = Actor::self()->getPid();
211   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
213   MPI_Status status;
214   //unknow size from the receiver pov
215   if (size <= 0.0) {
216     Request::probe(from, 0, MPI_COMM_WORLD, &status);
217     size = status.count;
218   }
219
220   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
221
222   TRACE_smpi_comm_out(my_proc_id);
223   get_reqq_self()->push_back(request);
224
225   log_timed_action (action, clock);
226 }
227
228 static void action_test(simgrid::xbt::ReplayAction& action)
229 {
230   CHECK_ACTION_PARAMS(action, 0, 0)
231   double clock = smpi_process()->simulated_elapsed();
232   MPI_Status status;
233
234   MPI_Request request = get_reqq_self()->back();
235   get_reqq_self()->pop_back();
236   //if request is null here, this may mean that a previous test has succeeded
237   //Different times in traced application and replayed version may lead to this
238   //In this case, ignore the extra calls.
239   if(request!=nullptr){
240     int my_proc_id = Actor::self()->getPid();
241     TRACE_smpi_testing_in(my_proc_id);
242
243     int flag = Request::test(&request, &status);
244
245     XBT_DEBUG("MPI_Test result: %d", flag);
246     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
247     get_reqq_self()->push_back(request);
248
249     TRACE_smpi_testing_out(my_proc_id);
250   }
251   log_timed_action (action, clock);
252 }
253
254 static void action_wait(simgrid::xbt::ReplayAction& action)
255 {
256   CHECK_ACTION_PARAMS(action, 0, 0)
257   double clock = smpi_process()->simulated_elapsed();
258   MPI_Status status;
259
260   std::string s = boost::algorithm::join(action, " ");
261   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
262   MPI_Request request = get_reqq_self()->back();
263   get_reqq_self()->pop_back();
264
265   if (request==nullptr){
266     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
267     return;
268   }
269
270   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
271
272   MPI_Group group = request->comm()->group();
273   int src_traced = group->rank(request->src());
274   int dst_traced = group->rank(request->dst());
275   int is_wait_for_receive = (request->flags() & RECV);
276   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
277
278   Request::wait(&request, &status);
279
280   TRACE_smpi_comm_out(rank);
281   if (is_wait_for_receive)
282     TRACE_smpi_recv(src_traced, dst_traced, 0);
283   log_timed_action (action, clock);
284 }
285
286 static void action_waitall(simgrid::xbt::ReplayAction& action)
287 {
288   CHECK_ACTION_PARAMS(action, 0, 0)
289   double clock = smpi_process()->simulated_elapsed();
290   const unsigned int count_requests = get_reqq_self()->size();
291
292   if (count_requests>0) {
293     MPI_Status status[count_requests];
294
295     int my_proc_id_traced = Actor::self()->getPid();
296     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
297                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
298     int recvs_snd[count_requests];
299     int recvs_rcv[count_requests];
300     for (unsigned int i = 0; i < count_requests; i++) {
301       const auto& req = (*get_reqq_self())[i];
302       if (req && (req->flags() & RECV)) {
303         recvs_snd[i] = req->src();
304         recvs_rcv[i] = req->dst();
305       } else
306         recvs_snd[i] = -100;
307    }
308    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
309
310    for (unsigned i = 0; i < count_requests; i++) {
311      if (recvs_snd[i]!=-100)
312        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
313    }
314    TRACE_smpi_comm_out(my_proc_id_traced);
315   }
316   log_timed_action (action, clock);
317 }
318
319 static void action_barrier(simgrid::xbt::ReplayAction& action)
320 {
321   double clock = smpi_process()->simulated_elapsed();
322   int my_proc_id = Actor::self()->getPid();
323   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
324
325   Colls::barrier(MPI_COMM_WORLD);
326
327   TRACE_smpi_comm_out(my_proc_id);
328   log_timed_action (action, clock);
329 }
330
331 static void action_bcast(simgrid::xbt::ReplayAction& action)
332 {
333   CHECK_ACTION_PARAMS(action, 1, 2)
334   double size = parse_double(action[2]);
335   double clock = smpi_process()->simulated_elapsed();
336   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
337   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
338   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
339
340   int my_proc_id = Actor::self()->getPid();
341   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
342                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
343                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
344
345   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
346
347   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
348
349   TRACE_smpi_comm_out(my_proc_id);
350   log_timed_action (action, clock);
351 }
352
353 static void action_reduce(simgrid::xbt::ReplayAction& action)
354 {
355   CHECK_ACTION_PARAMS(action, 2, 2)
356   double comm_size = parse_double(action[2]);
357   double comp_size = parse_double(action[3]);
358   double clock = smpi_process()->simulated_elapsed();
359   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
360
361   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
362
363   int my_proc_id = Actor::self()->getPid();
364   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
365                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
366                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
367
368   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
369   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
370   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
371   smpi_execute_flops(comp_size);
372
373   TRACE_smpi_comm_out(my_proc_id);
374   log_timed_action (action, clock);
375 }
376
377 static void action_allReduce(simgrid::xbt::ReplayAction& action)
378 {
379   CHECK_ACTION_PARAMS(action, 2, 1)
380   double comm_size = parse_double(action[2]);
381   double comp_size = parse_double(action[3]);
382
383   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
384
385   double clock = smpi_process()->simulated_elapsed();
386   int my_proc_id = Actor::self()->getPid();
387   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
388                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
389
390   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
391   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
392   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
393   smpi_execute_flops(comp_size);
394
395   TRACE_smpi_comm_out(my_proc_id);
396   log_timed_action (action, clock);
397 }
398
399 static void action_allToAll(simgrid::xbt::ReplayAction& action)
400 {
401   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
402   double clock = smpi_process()->simulated_elapsed();
403   unsigned long comm_size = MPI_COMM_WORLD->size();
404   int send_size = parse_double(action[2]);
405   int recv_size = parse_double(action[3]);
406   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
407   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
408
409   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
410   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
411
412   int my_proc_id = Actor::self()->getPid();
413   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
414                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
415                                                     Datatype::encode(MPI_CURRENT_TYPE),
416                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
417
418   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
419
420   TRACE_smpi_comm_out(my_proc_id);
421   log_timed_action (action, clock);
422 }
423
424 static void action_gather(simgrid::xbt::ReplayAction& action)
425 {
426   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
427         0 gather 68 68 0 0 0
428       where:
429         1) 68 is the sendcounts
430         2) 68 is the recvcounts
431         3) 0 is the root node
432         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
433         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
434   */
435   CHECK_ACTION_PARAMS(action, 2, 3)
436   double clock = smpi_process()->simulated_elapsed();
437   unsigned long comm_size = MPI_COMM_WORLD->size();
438   int send_size = parse_double(action[2]);
439   int recv_size = parse_double(action[3]);
440   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
441   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
442
443   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
444   void *recv = nullptr;
445   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
446   int rank = MPI_COMM_WORLD->rank();
447
448   if(rank==root)
449     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
450
451   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
452                                                                         Datatype::encode(MPI_CURRENT_TYPE),
453                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
454
455   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
456
457   TRACE_smpi_comm_out(Actor::self()->getPid());
458   log_timed_action (action, clock);
459 }
460
461 static void action_scatter(simgrid::xbt::ReplayAction& action)
462 {
463   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
464         0 gather 68 68 0 0 0
465       where:
466         1) 68 is the sendcounts
467         2) 68 is the recvcounts
468         3) 0 is the root node
469         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
470         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
471   */
472   CHECK_ACTION_PARAMS(action, 2, 3)
473   double clock                   = smpi_process()->simulated_elapsed();
474   unsigned long comm_size        = MPI_COMM_WORLD->size();
475   int send_size                  = parse_double(action[2]);
476   int recv_size                  = parse_double(action[3]);
477   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
478   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
479
480   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
481   void* recv = nullptr;
482   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
483   int rank = MPI_COMM_WORLD->rank();
484
485   if (rank == root)
486     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
487
488   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
489                                                                         Datatype::encode(MPI_CURRENT_TYPE),
490                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
491
492   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
493
494   TRACE_smpi_comm_out(Actor::self()->getPid());
495   log_timed_action(action, clock);
496 }
497
498 static void action_gatherv(simgrid::xbt::ReplayAction& action)
499 {
500   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
501        0 gather 68 68 10 10 10 0 0 0
502      where:
503        1) 68 is the sendcount
504        2) 68 10 10 10 is the recvcounts
505        3) 0 is the root node
506        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
507        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
508   */
509   double clock = smpi_process()->simulated_elapsed();
510   unsigned long comm_size = MPI_COMM_WORLD->size();
511   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
512   int send_size = parse_double(action[2]);
513   std::vector<int> disps(comm_size, 0);
514   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
515
516   MPI_Datatype MPI_CURRENT_TYPE =
517       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
518   MPI_Datatype MPI_CURRENT_TYPE2{
519       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
520
521   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
522   void *recv = nullptr;
523   for (unsigned int i = 0; i < comm_size; i++) {
524     (*recvcounts)[i] = std::stoi(action[i + 3]);
525   }
526   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
527
528   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
529   int rank = MPI_COMM_WORLD->rank();
530
531   if(rank==root)
532     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
533
534   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
535                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
536                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
537
538   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
539                  MPI_COMM_WORLD);
540
541   TRACE_smpi_comm_out(Actor::self()->getPid());
542   log_timed_action (action, clock);
543 }
544
545 static void action_scatterv(simgrid::xbt::ReplayAction& action)
546 {
547   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
548        0 gather 68 10 10 10 68 0 0 0
549      where:
550        1) 68 10 10 10 is the sendcounts
551        2) 68 is the recvcount
552        3) 0 is the root node
553        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
554        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
555   */
556   double clock  = smpi_process()->simulated_elapsed();
557   unsigned long comm_size = MPI_COMM_WORLD->size();
558   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
559   int recv_size = parse_double(action[2 + comm_size]);
560   std::vector<int> disps(comm_size, 0);
561   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
562
563   MPI_Datatype MPI_CURRENT_TYPE =
564       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
565   MPI_Datatype MPI_CURRENT_TYPE2{
566       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
567
568   void* send = nullptr;
569   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
570   for (unsigned int i = 0; i < comm_size; i++) {
571     (*sendcounts)[i] = std::stoi(action[i + 2]);
572   }
573   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
574
575   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
576   int rank = MPI_COMM_WORLD->rank();
577
578   if (rank == root)
579     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
580
581   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
582                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
583                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
584
585   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
586                   MPI_COMM_WORLD);
587
588   TRACE_smpi_comm_out(Actor::self()->getPid());
589   log_timed_action(action, clock);
590 }
591
592 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
593 {
594   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
595        0 reduceScatter 275427 275427 275427 204020 11346849 0
596      where:
597        1) The first four values after the name of the action declare the recvcounts array
598        2) The value 11346849 is the amount of instructions
599        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
600  */
601   double clock = smpi_process()->simulated_elapsed();
602   unsigned long comm_size = MPI_COMM_WORLD->size();
603   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
604   int comp_size = parse_double(action[2+comm_size]);
605   int my_proc_id                     = Actor::self()->getPid();
606   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
607   MPI_Datatype MPI_CURRENT_TYPE =
608       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
609
610   for (unsigned int i = 0; i < comm_size; i++) {
611     recvcounts->push_back(std::stoi(action[i + 2]));
612   }
613   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
614
615   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
616                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
617                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
618                                                        Datatype::encode(MPI_CURRENT_TYPE)));
619
620   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
621   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
622
623   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
624   smpi_execute_flops(comp_size);
625
626   TRACE_smpi_comm_out(my_proc_id);
627   log_timed_action (action, clock);
628 }
629
630 static void action_allgather(simgrid::xbt::ReplayAction& action)
631 {
632   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
633         0 allGather 275427 275427
634     where:
635         1) 275427 is the sendcount
636         2) 275427 is the recvcount
637         3) No more values mean that the datatype for sent and receive buffer is the default one, see
638     simgrid::smpi::Datatype::decode().
639   */
640   double clock = smpi_process()->simulated_elapsed();
641
642   CHECK_ACTION_PARAMS(action, 2, 2)
643   int sendcount = std::stoi(action[2]);
644   int recvcount = std::stoi(action[3]);
645
646   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
647   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
648
649   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
650   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
651
652   int my_proc_id = Actor::self()->getPid();
653
654   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
655                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
656                                                     Datatype::encode(MPI_CURRENT_TYPE),
657                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
658
659   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
660
661   TRACE_smpi_comm_out(my_proc_id);
662   log_timed_action (action, clock);
663 }
664
665 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
666 {
667   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
668         0 allGatherV 275427 275427 275427 275427 204020
669      where:
670         1) 275427 is the sendcount
671         2) The next four elements declare the recvcounts array
672         3) No more values mean that the datatype for sent and receive buffer is the default one, see
673      simgrid::smpi::Datatype::decode().
674   */
675   double clock = smpi_process()->simulated_elapsed();
676
677   unsigned long comm_size = MPI_COMM_WORLD->size();
678   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
679   int sendcount = std::stoi(action[2]);
680   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
681   std::vector<int> disps(comm_size, 0);
682
683   int datatype_index = 0, disp_index = 0;
684   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
685     datatype_index = 3 + comm_size;
686     disp_index     = datatype_index + 1;
687   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
688     datatype_index = -1;
689     disp_index     = 3 + comm_size;
690   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
691     datatype_index = 3 + comm_size;
692   }
693
694   if (disp_index != 0) {
695     for (unsigned int i = 0; i < comm_size; i++)
696       disps[i]          = std::stoi(action[disp_index + i]);
697   }
698
699   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
700                                                      : MPI_DEFAULT_TYPE};
701   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
702                                                       : MPI_DEFAULT_TYPE};
703
704   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
705
706   for (unsigned int i = 0; i < comm_size; i++) {
707     (*recvcounts)[i] = std::stoi(action[i + 3]);
708   }
709   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
710   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
711
712   int my_proc_id = Actor::self()->getPid();
713
714   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
715                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
716                                                        Datatype::encode(MPI_CURRENT_TYPE),
717                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
718
719   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
720                     MPI_COMM_WORLD);
721
722   TRACE_smpi_comm_out(my_proc_id);
723   log_timed_action (action, clock);
724 }
725
726 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
727 {
728   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
729         0 allToAllV 100 1 7 10 12 100 1 70 10 5
730      where:
731         1) 100 is the size of the send buffer *sizeof(int),
732         2) 1 7 10 12 is the sendcounts array
733         3) 100*sizeof(int) is the size of the receiver buffer
734         4)  1 70 10 5 is the recvcounts array
735   */
736   double clock = smpi_process()->simulated_elapsed();
737
738   unsigned long comm_size = MPI_COMM_WORLD->size();
739   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
740   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
741   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
742   std::vector<int> senddisps(comm_size, 0);
743   std::vector<int> recvdisps(comm_size, 0);
744
745   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
746                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
747                                       : MPI_DEFAULT_TYPE;
748   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
749                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
750                                      : MPI_DEFAULT_TYPE};
751
752   int send_buf_size=parse_double(action[2]);
753   int recv_buf_size=parse_double(action[3+comm_size]);
754   int my_proc_id = Actor::self()->getPid();
755   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
756   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
757
758   for (unsigned int i = 0; i < comm_size; i++) {
759     (*sendcounts)[i] = std::stoi(action[3 + i]);
760     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
761   }
762   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
763   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
764
765   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
766                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
767                                                        Datatype::encode(MPI_CURRENT_TYPE),
768                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
769
770   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
771                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
772
773   TRACE_smpi_comm_out(my_proc_id);
774   log_timed_action (action, clock);
775 }
776
777 }} // namespace simgrid::smpi
778
779 /** @brief Only initialize the replay, don't do it for real */
780 void smpi_replay_init(int* argc, char*** argv)
781 {
782   simgrid::smpi::Process::init(argc, argv);
783   smpi_process()->mark_as_initialized();
784   smpi_process()->set_replaying(true);
785
786   int my_proc_id = Actor::self()->getPid();
787   TRACE_smpi_init(my_proc_id);
788   TRACE_smpi_computing_init(my_proc_id);
789   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
790   TRACE_smpi_comm_out(my_proc_id);
791   xbt_replay_action_register("init",       simgrid::smpi::action_init);
792   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
793   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
794   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
795   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
796   xbt_replay_action_register("send",       simgrid::smpi::action_send);
797   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
798   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
799   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
800   xbt_replay_action_register("test",       simgrid::smpi::action_test);
801   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
802   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
803   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
804   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
805   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
806   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
807   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
808   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
809   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
810   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
812   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
814   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
816   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
817
818   //if we have a delayed start, sleep here.
819   if(*argc>2){
820     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822     smpi_execute_flops(value);
823   } else {
824     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
826     smpi_execute_flops(0.0);
827   }
828 }
829
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
832 {
833   simgrid::xbt::replay_runner(*argc, *argv);
834
835   /* and now, finalize everything */
836   /* One active process will stop. Decrease the counter*/
837   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838   if (not get_reqq_self()->empty()) {
839     unsigned int count_requests=get_reqq_self()->size();
840     MPI_Request requests[count_requests];
841     MPI_Status status[count_requests];
842     unsigned int i=0;
843
844     for (auto const& req : *get_reqq_self()) {
845       requests[i] = req;
846       i++;
847     }
848     simgrid::smpi::Request::waitall(count_requests, requests, status);
849   }
850   delete get_reqq_self();
851   active_processes--;
852
853   if(active_processes==0){
854     /* Last process alive speaking: end the simulated timer */
855     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856     smpi_free_replay_tmp_buffers();
857   }
858
859   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
860
861   smpi_process()->finalize();
862
863   TRACE_smpi_comm_out(Actor::self()->getPid());
864   TRACE_smpi_finalize(Actor::self()->getPid());
865 }
866
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
869 {
870   smpi_replay_init(argc, argv);
871   smpi_replay_main(argc, argv);
872 }