Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
db6dd8309a9b1ca5df6cbf0ee6ec2d992aa3d8c6
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <numeric>
16 #include <unordered_map>
17 #include <vector>
18
19 using simgrid::s4u::Actor;
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22
23 static int communicator_size = 0;
24 static int active_processes  = 0;
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26
27 static MPI_Datatype MPI_DEFAULT_TYPE;
28 static MPI_Datatype MPI_CURRENT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 static void log_timed_action (const char *const *action, double clock){
36   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37     char *name = xbt_str_join_array(action, " ");
38     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
39     xbt_free(name);
40   }
41 }
42
43 static std::vector<MPI_Request>* get_reqq_self()
44 {
45   return reqq.at(Actor::self()->getPid());
46 }
47
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 {
50    reqq.insert({Actor::self()->getPid(), mpi_request});
51 }
52
53 //allocate a single buffer for all sends, growing it if needed
54 void* smpi_get_tmp_sendbuffer(int size)
55 {
56   if (not smpi_process()->replaying())
57     return xbt_malloc(size);
58   if (sendbuffer_size<size){
59     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60     sendbuffer_size=size;
61   }
62   return sendbuffer;
63 }
64
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67   if (not smpi_process()->replaying())
68     return xbt_malloc(size);
69   if (recvbuffer_size<size){
70     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71     recvbuffer_size=size;
72   }
73   return recvbuffer;
74 }
75
76 void smpi_free_tmp_buffer(void* buf){
77   if (not smpi_process()->replaying())
78     xbt_free(buf);
79 }
80
81 /* Helper function */
82 static double parse_double(const char *string)
83 {
84   char *endptr;
85   double value = strtod(string, &endptr);
86   if (*endptr != '\0')
87     THROWF(unknown_error, 0, "%s is not a double", string);
88   return value;
89 }
90
91
92 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
93 static MPI_Datatype decode_datatype(const char *const action)
94 {
95   switch(atoi(action)) {
96     case 0:
97       return MPI_DOUBLE;
98       break;
99     case 1:
100       return MPI_INT;
101       break;
102     case 2:
103       return MPI_CHAR;
104       break;
105     case 3:
106       return MPI_SHORT;
107       break;
108     case 4:
109       return MPI_LONG;
110       break;
111     case 5:
112       return MPI_FLOAT;
113       break;
114     case 6:
115       return MPI_BYTE;
116       break;
117     default:
118       return MPI_DEFAULT_TYPE;
119       break;
120   }
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int my_proc_id = Actor::self()->getPid();
202
203   TRACE_smpi_computing_in(my_proc_id, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(my_proc_id);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218
219   int my_proc_id = Actor::self()->getPid();
220   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221
222   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224   if (not TRACE_smpi_view_internals())
225     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226
227   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228
229   TRACE_smpi_comm_out(my_proc_id);
230
231   log_timed_action(action, clock);
232 }
233
234 static void action_Isend(const char *const *action)
235 {
236   CHECK_ACTION_PARAMS(action, 2, 1)
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process()->simulated_elapsed();
240
241   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242
243   int my_proc_id = Actor::self()->getPid();
244   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247   if (not TRACE_smpi_view_internals())
248     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249
250   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251
252   TRACE_smpi_comm_out(my_proc_id);
253
254   get_reqq_self()->push_back(request);
255
256   log_timed_action (action, clock);
257 }
258
259 static void action_recv(const char *const *action) {
260   CHECK_ACTION_PARAMS(action, 2, 1)
261   int from = atoi(action[2]);
262   double size=parse_double(action[3]);
263   double clock = smpi_process()->simulated_elapsed();
264   MPI_Status status;
265
266   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267
268   int my_proc_id = Actor::self()->getPid();
269   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270
271   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273
274   //unknown size from the receiver point of view
275   if (size <= 0.0) {
276     Request::probe(from, 0, MPI_COMM_WORLD, &status);
277     size=status.count;
278   }
279
280   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281
282   TRACE_smpi_comm_out(my_proc_id);
283   if (not TRACE_smpi_view_internals()) {
284     TRACE_smpi_recv(src_traced, my_proc_id, 0);
285   }
286
287   log_timed_action (action, clock);
288 }
289
290 static void action_Irecv(const char *const *action)
291 {
292   CHECK_ACTION_PARAMS(action, 2, 1)
293   int from = atoi(action[2]);
294   double size=parse_double(action[3]);
295   double clock = smpi_process()->simulated_elapsed();
296
297   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298
299   int my_proc_id = Actor::self()->getPid();
300   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302   MPI_Status status;
303   //unknow size from the receiver pov
304   if (size <= 0.0) {
305     Request::probe(from, 0, MPI_COMM_WORLD, &status);
306     size = status.count;
307   }
308
309   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310
311   TRACE_smpi_comm_out(my_proc_id);
312   get_reqq_self()->push_back(request);
313
314   log_timed_action (action, clock);
315 }
316
317 static void action_test(const char* const* action)
318 {
319   CHECK_ACTION_PARAMS(action, 0, 0)
320   double clock = smpi_process()->simulated_elapsed();
321   MPI_Status status;
322
323   MPI_Request request = get_reqq_self()->back();
324   get_reqq_self()->pop_back();
325   //if request is null here, this may mean that a previous test has succeeded
326   //Different times in traced application and replayed version may lead to this
327   //In this case, ignore the extra calls.
328   if(request!=nullptr){
329     int my_proc_id = Actor::self()->getPid();
330     TRACE_smpi_testing_in(my_proc_id);
331
332     int flag = Request::test(&request, &status);
333
334     XBT_DEBUG("MPI_Test result: %d", flag);
335     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336     get_reqq_self()->push_back(request);
337
338     TRACE_smpi_testing_out(my_proc_id);
339   }
340   log_timed_action (action, clock);
341 }
342
343 static void action_wait(const char *const *action){
344   CHECK_ACTION_PARAMS(action, 0, 0)
345   double clock = smpi_process()->simulated_elapsed();
346   MPI_Status status;
347
348   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349       xbt_str_join_array(action," "));
350   MPI_Request request = get_reqq_self()->back();
351   get_reqq_self()->pop_back();
352
353   if (request==nullptr){
354     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
355     return;
356   }
357
358   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359
360   MPI_Group group = request->comm()->group();
361   int src_traced = group->rank(request->src());
362   int dst_traced = group->rank(request->dst());
363   int is_wait_for_receive = (request->flags() & RECV);
364   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365
366   Request::wait(&request, &status);
367
368   TRACE_smpi_comm_out(rank);
369   if (is_wait_for_receive)
370     TRACE_smpi_recv(src_traced, dst_traced, 0);
371   log_timed_action (action, clock);
372 }
373
374 static void action_waitall(const char *const *action){
375   CHECK_ACTION_PARAMS(action, 0, 0)
376   double clock = smpi_process()->simulated_elapsed();
377   const unsigned int count_requests = get_reqq_self()->size();
378
379   if (count_requests>0) {
380     MPI_Status status[count_requests];
381
382     int my_proc_id_traced = Actor::self()->getPid();
383     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385     int recvs_snd[count_requests];
386     int recvs_rcv[count_requests];
387     for (unsigned int i = 0; i < count_requests; i++) {
388       const auto& req = (*get_reqq_self())[i];
389       if (req && (req->flags() & RECV)) {
390         recvs_snd[i] = req->src();
391         recvs_rcv[i] = req->dst();
392       } else
393         recvs_snd[i] = -100;
394    }
395    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396
397    for (unsigned i = 0; i < count_requests; i++) {
398      if (recvs_snd[i]!=-100)
399        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400    }
401    TRACE_smpi_comm_out(my_proc_id_traced);
402   }
403   log_timed_action (action, clock);
404 }
405
406 static void action_barrier(const char *const *action){
407   double clock = smpi_process()->simulated_elapsed();
408   int my_proc_id = Actor::self()->getPid();
409   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410
411   Colls::barrier(MPI_COMM_WORLD);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_bcast(const char *const *action)
418 {
419   CHECK_ACTION_PARAMS(action, 1, 2)
420   double size = parse_double(action[2]);
421   double clock = smpi_process()->simulated_elapsed();
422   int root     = (action[3]) ? atoi(action[3]) : 0;
423   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425
426   int my_proc_id = Actor::self()->getPid();
427   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
429                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
430
431   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
432
433   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
434
435   TRACE_smpi_comm_out(my_proc_id);
436   log_timed_action (action, clock);
437 }
438
439 static void action_reduce(const char *const *action)
440 {
441   CHECK_ACTION_PARAMS(action, 2, 2)
442   double comm_size = parse_double(action[2]);
443   double comp_size = parse_double(action[3]);
444   double clock = smpi_process()->simulated_elapsed();
445   int root         = (action[4]) ? atoi(action[4]) : 0;
446
447   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
448
449   int my_proc_id = Actor::self()->getPid();
450   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
451                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
452                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
453
454   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
457   smpi_execute_flops(comp_size);
458
459   TRACE_smpi_comm_out(my_proc_id);
460   log_timed_action (action, clock);
461 }
462
463 static void action_allReduce(const char *const *action) {
464   CHECK_ACTION_PARAMS(action, 2, 1)
465   double comm_size = parse_double(action[2]);
466   double comp_size = parse_double(action[3]);
467
468   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
469
470   double clock = smpi_process()->simulated_elapsed();
471   int my_proc_id = Actor::self()->getPid();
472   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
473                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
474
475   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
478   smpi_execute_flops(comp_size);
479
480   TRACE_smpi_comm_out(my_proc_id);
481   log_timed_action (action, clock);
482 }
483
484 static void action_allToAll(const char *const *action) {
485   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
486   double clock = smpi_process()->simulated_elapsed();
487   int comm_size = MPI_COMM_WORLD->size();
488   int send_size = parse_double(action[2]);
489   int recv_size = parse_double(action[3]);
490   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
491   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492
493   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
494   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
495
496   int my_proc_id = Actor::self()->getPid();
497   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
498                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
499                                                     encode_datatype(MPI_CURRENT_TYPE),
500                                                     encode_datatype(MPI_CURRENT_TYPE2)));
501
502   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
503
504   TRACE_smpi_comm_out(my_proc_id);
505   log_timed_action (action, clock);
506 }
507
508 static void action_gather(const char *const *action) {
509   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
510         0 gather 68 68 0 0 0
511       where:
512         1) 68 is the sendcounts
513         2) 68 is the recvcounts
514         3) 0 is the root node
515         4) 0 is the send datatype id, see decode_datatype()
516         5) 0 is the recv datatype id, see decode_datatype()
517   */
518   CHECK_ACTION_PARAMS(action, 2, 3)
519   double clock = smpi_process()->simulated_elapsed();
520   int comm_size = MPI_COMM_WORLD->size();
521   int send_size = parse_double(action[2]);
522   int recv_size = parse_double(action[3]);
523   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
524   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
525
526   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
527   void *recv = nullptr;
528   int root   = (action[4]) ? atoi(action[4]) : 0;
529   int rank = MPI_COMM_WORLD->rank();
530
531   if(rank==root)
532     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
533
534   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
535                                                                         encode_datatype(MPI_CURRENT_TYPE),
536                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
537
538   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
539
540   TRACE_smpi_comm_out(Actor::self()->getPid());
541   log_timed_action (action, clock);
542 }
543
544 static void action_scatter(const char* const* action)
545 {
546   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
547         0 gather 68 68 0 0 0
548       where:
549         1) 68 is the sendcounts
550         2) 68 is the recvcounts
551         3) 0 is the root node
552         4) 0 is the send datatype id, see decode_datatype()
553         5) 0 is the recv datatype id, see decode_datatype()
554   */
555   CHECK_ACTION_PARAMS(action, 2, 3)
556   double clock                   = smpi_process()->simulated_elapsed();
557   int comm_size                  = MPI_COMM_WORLD->size();
558   int send_size                  = parse_double(action[2]);
559   int recv_size                  = parse_double(action[3]);
560   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
561   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
562
563   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
564   void* recv = nullptr;
565   int root   = (action[4]) ? atoi(action[4]) : 0;
566   int rank = MPI_COMM_WORLD->rank();
567
568   if (rank == root)
569     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572                                                                         encode_datatype(MPI_CURRENT_TYPE),
573                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
574
575   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576
577   TRACE_smpi_comm_out(Actor::self()->getPid());
578   log_timed_action(action, clock);
579 }
580
581 static void action_gatherv(const char *const *action) {
582   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
583        0 gather 68 68 10 10 10 0 0 0
584      where:
585        1) 68 is the sendcount
586        2) 68 10 10 10 is the recvcounts
587        3) 0 is the root node
588        4) 0 is the send datatype id, see decode_datatype()
589        5) 0 is the recv datatype id, see decode_datatype()
590   */
591   double clock = smpi_process()->simulated_elapsed();
592   int comm_size = MPI_COMM_WORLD->size();
593   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
594   int send_size = parse_double(action[2]);
595   int disps[comm_size];
596   int recvcounts[comm_size];
597   int recv_sum=0;
598
599   MPI_CURRENT_TYPE =
600       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601   MPI_Datatype MPI_CURRENT_TYPE2{
602       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
603
604   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605   void *recv = nullptr;
606   for(int i=0;i<comm_size;i++) {
607     recvcounts[i] = atoi(action[i+3]);
608     recv_sum=recv_sum+recvcounts[i];
609     disps[i]=0;
610   }
611
612   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613   int rank = MPI_COMM_WORLD->rank();
614
615   if(rank==root)
616     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617
618   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
619
620   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623
624   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
625
626   TRACE_smpi_comm_out(Actor::self()->getPid());
627   log_timed_action (action, clock);
628 }
629
630 static void action_scatterv(const char* const* action)
631 {
632   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
633        0 gather 68 10 10 10 68 0 0 0
634      where:
635        1) 68 10 10 10 is the sendcounts
636        2) 68 is the recvcount
637        3) 0 is the root node
638        4) 0 is the send datatype id, see decode_datatype()
639        5) 0 is the recv datatype id, see decode_datatype()
640   */
641   double clock  = smpi_process()->simulated_elapsed();
642   int comm_size = MPI_COMM_WORLD->size();
643   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
644   int recv_size = parse_double(action[2 + comm_size]);
645   int disps[comm_size];
646   int sendcounts[comm_size];
647   int send_sum = 0;
648
649   MPI_CURRENT_TYPE =
650       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651   MPI_Datatype MPI_CURRENT_TYPE2{
652       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
653
654   void* send = nullptr;
655   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656   for (int i = 0; i < comm_size; i++) {
657     sendcounts[i] = atoi(action[i + 2]);
658     send_sum += sendcounts[i];
659     disps[i] = 0;
660   }
661
662   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
663   int rank = MPI_COMM_WORLD->rank();
664
665   if (rank == root)
666     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667
668   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
669
670   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
672                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
673
674   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
675
676   TRACE_smpi_comm_out(Actor::self()->getPid());
677   log_timed_action(action, clock);
678 }
679
680 static void action_reducescatter(const char *const *action) {
681  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
682       0 reduceScatter 275427 275427 275427 204020 11346849 0
683     where:
684       1) The first four values after the name of the action declare the recvcounts array
685       2) The value 11346849 is the amount of instructions
686       3) The last value corresponds to the datatype, see decode_datatype().
687 */
688   double clock = smpi_process()->simulated_elapsed();
689   int comm_size = MPI_COMM_WORLD->size();
690   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
691   int comp_size = parse_double(action[2+comm_size]);
692   int my_proc_id                     = Actor::self()->getPid();
693   std::vector<int>* trace_recvcounts = new std::vector<int>;
694   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
695
696   for(int i=0;i<comm_size;i++) {
697     trace_recvcounts->push_back(atoi(action[i + 2]));
698   }
699   int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
700
701   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
702                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
703                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
704                                                        encode_datatype(MPI_CURRENT_TYPE)));
705
706   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
707   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
708
709   Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
710   smpi_execute_flops(comp_size);
711
712   TRACE_smpi_comm_out(my_proc_id);
713   log_timed_action (action, clock);
714 }
715
716 static void action_allgather(const char *const *action) {
717   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
718         0 allGather 275427 275427
719     where:
720         1) 275427 is the sendcount
721         2) 275427 is the recvcount
722         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
723   */
724   double clock = smpi_process()->simulated_elapsed();
725
726   CHECK_ACTION_PARAMS(action, 2, 2)
727   int sendcount=atoi(action[2]);
728   int recvcount=atoi(action[3]);
729
730   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
731   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
732
733   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
734   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
735
736   int my_proc_id = Actor::self()->getPid();
737
738   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
740                                                     encode_datatype(MPI_CURRENT_TYPE),
741                                                     encode_datatype(MPI_CURRENT_TYPE2)));
742
743   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
744
745   TRACE_smpi_comm_out(my_proc_id);
746   log_timed_action (action, clock);
747 }
748
749 static void action_allgatherv(const char *const *action) {
750   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
751         0 allGatherV 275427 275427 275427 275427 204020
752      where:
753         1) 275427 is the sendcount
754         2) The next four elements declare the recvcounts array
755         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
756   */
757   double clock = smpi_process()->simulated_elapsed();
758
759   int comm_size = MPI_COMM_WORLD->size();
760   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
761   int sendcount=atoi(action[2]);
762   int recvcounts[comm_size];
763   int disps[comm_size];
764   int recv_sum=0;
765
766   MPI_CURRENT_TYPE =
767       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
768   MPI_Datatype MPI_CURRENT_TYPE2{
769       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
770
771   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
772
773   for(int i=0;i<comm_size;i++) {
774     recvcounts[i] = atoi(action[i+3]);
775     recv_sum=recv_sum+recvcounts[i];
776     disps[i] = 0;
777   }
778   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
779
780   int my_proc_id = Actor::self()->getPid();
781
782   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
783
784   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
785                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
786                                                        encode_datatype(MPI_CURRENT_TYPE),
787                                                        encode_datatype(MPI_CURRENT_TYPE2)));
788
789   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
790                           MPI_COMM_WORLD);
791
792   TRACE_smpi_comm_out(my_proc_id);
793   log_timed_action (action, clock);
794 }
795
796 static void action_allToAllv(const char *const *action) {
797   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
798         0 allToAllV 100 1 7 10 12 100 1 70 10 5
799      where:
800         1) 100 is the size of the send buffer *sizeof(int),
801         2) 1 7 10 12 is the sendcounts array
802         3) 100*sizeof(int) is the size of the receiver buffer
803         4)  1 70 10 5 is the recvcounts array
804   */
805   double clock = smpi_process()->simulated_elapsed();
806
807   int comm_size = MPI_COMM_WORLD->size();
808   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
809   int send_size = 0;
810   int recv_size = 0;
811   int sendcounts[comm_size];
812   std::vector<int>* trace_sendcounts = new std::vector<int>;
813   int recvcounts[comm_size];
814   std::vector<int>* trace_recvcounts = new std::vector<int>;
815   int senddisps[comm_size];
816   int recvdisps[comm_size];
817
818   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
819                          ? decode_datatype(action[4 + 2 * comm_size])
820                          : MPI_DEFAULT_TYPE;
821   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
822                                      ? decode_datatype(action[5 + 2 * comm_size])
823                                      : MPI_DEFAULT_TYPE};
824
825   int send_buf_size=parse_double(action[2]);
826   int recv_buf_size=parse_double(action[3+comm_size]);
827   int my_proc_id = Actor::self()->getPid();
828   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
829   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
830
831   for(int i=0;i<comm_size;i++) {
832     sendcounts[i] = atoi(action[i+3]);
833     trace_sendcounts->push_back(sendcounts[i]);
834     send_size += sendcounts[i];
835     recvcounts[i] = atoi(action[i+4+comm_size]);
836     trace_recvcounts->push_back(recvcounts[i]);
837     recv_size += recvcounts[i];
838     senddisps[i] = 0;
839     recvdisps[i] = 0;
840   }
841
842   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
843                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
844                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
845                                                        encode_datatype(MPI_CURRENT_TYPE2)));
846
847   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
848                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
849
850   TRACE_smpi_comm_out(my_proc_id);
851   log_timed_action (action, clock);
852 }
853
854 }} // namespace simgrid::smpi
855
856 /** @brief Only initialize the replay, don't do it for real */
857 void smpi_replay_init(int* argc, char*** argv)
858 {
859   simgrid::smpi::Process::init(argc, argv);
860   smpi_process()->mark_as_initialized();
861   smpi_process()->set_replaying(true);
862
863   int my_proc_id = Actor::self()->getPid();
864   TRACE_smpi_init(my_proc_id);
865   TRACE_smpi_computing_init(my_proc_id);
866   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
867   TRACE_smpi_comm_out(my_proc_id);
868   xbt_replay_action_register("init",       simgrid::smpi::action_init);
869   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
870   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
871   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
872   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
873   xbt_replay_action_register("send",       simgrid::smpi::action_send);
874   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
875   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
876   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
877   xbt_replay_action_register("test",       simgrid::smpi::action_test);
878   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
879   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
880   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
881   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
882   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
883   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
884   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
885   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
886   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
887   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
888   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
889   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
890   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
891   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
892   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
893   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
894
895   //if we have a delayed start, sleep here.
896   if(*argc>2){
897     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
898     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
899     smpi_execute_flops(value);
900   } else {
901     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
902     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
903     smpi_execute_flops(0.0);
904   }
905 }
906
907 /** @brief actually run the replay after initialization */
908 void smpi_replay_main(int* argc, char*** argv)
909 {
910   simgrid::xbt::replay_runner(*argc, *argv);
911
912   /* and now, finalize everything */
913   /* One active process will stop. Decrease the counter*/
914   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
915   if (not get_reqq_self()->empty()) {
916     unsigned int count_requests=get_reqq_self()->size();
917     MPI_Request requests[count_requests];
918     MPI_Status status[count_requests];
919     unsigned int i=0;
920
921     for (auto const& req : *get_reqq_self()) {
922       requests[i] = req;
923       i++;
924     }
925     simgrid::smpi::Request::waitall(count_requests, requests, status);
926   }
927   delete get_reqq_self();
928   active_processes--;
929
930   if(active_processes==0){
931     /* Last process alive speaking: end the simulated timer */
932     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
933     xbt_free(sendbuffer);
934     xbt_free(recvbuffer);
935   }
936
937   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
938
939   smpi_process()->finalize();
940
941   TRACE_smpi_comm_out(Actor::self()->getPid());
942   TRACE_smpi_finalize(Actor::self()->getPid());
943 }
944
945 /** @brief chain a replay initialization and a replay start */
946 void smpi_replay_run(int* argc, char*** argv)
947 {
948   smpi_replay_init(argc, argv);
949   smpi_replay_main(argc, argv);
950 }