Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Clean initialization of MPI_CURRENT_TYPE and root
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
27
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       MPI_CURRENT_TYPE=MPI_DOUBLE;
99       break;
100     case 1:
101       MPI_CURRENT_TYPE=MPI_INT;
102       break;
103     case 2:
104       MPI_CURRENT_TYPE=MPI_CHAR;
105       break;
106     case 3:
107       MPI_CURRENT_TYPE=MPI_SHORT;
108       break;
109     case 4:
110       MPI_CURRENT_TYPE=MPI_LONG;
111       break;
112     case 5:
113       MPI_CURRENT_TYPE=MPI_FLOAT;
114       break;
115     case 6:
116       MPI_CURRENT_TYPE=MPI_BYTE;
117       break;
118     default:
119       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120       break;
121   }
122    return MPI_CURRENT_TYPE;
123 }
124
125 const char* encode_datatype(MPI_Datatype datatype)
126 {
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   // default - not implemented.
142   // do not warn here as we pass in this function even for other trace formats
143   return "-1";
144 }
145
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147     int i=0;\
148     while(action[i]!=nullptr)\
149      i++;\
150     if(i<mandatory+2)                                           \
151     THROWF(arg_error, 0, "%s replay failed.\n" \
152           "%d items were given on the line. First two should be process_id and action.  " \
153           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
155   }
156
157 namespace simgrid {
158 namespace smpi {
159
160 static void action_init(const char *const *action)
161 {
162   XBT_DEBUG("Initialize the counters");
163   CHECK_ACTION_PARAMS(action, 0, 1)
164   if(action[2])
165     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166   else
167     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168
169   /* start a simulated timer */
170   smpi_process()->simulated_start();
171   /*initialize the number of active processes */
172   active_processes = smpi_process_count();
173
174   set_reqq_self(new std::vector<MPI_Request>);
175 }
176
177 static void action_finalize(const char *const *action)
178 {
179   /* Nothing to do */
180 }
181
182 static void action_comm_size(const char *const *action)
183 {
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_comm_dup(const char *const *action)
194 {
195   log_timed_action (action, smpi_process()->simulated_elapsed());
196 }
197
198 static void action_compute(const char *const *action)
199 {
200   CHECK_ACTION_PARAMS(action, 1, 0)
201   double clock = smpi_process()->simulated_elapsed();
202   double flops= parse_double(action[2]);
203   int my_proc_id = Actor::self()->getPid();
204
205   TRACE_smpi_computing_in(my_proc_id, flops);
206   smpi_execute_flops(flops);
207   TRACE_smpi_computing_out(my_proc_id);
208
209   log_timed_action (action, clock);
210 }
211
212 static void action_send(const char *const *action)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 1)
215   int to = atoi(action[2]);
216   double size=parse_double(action[3]);
217   double clock = smpi_process()->simulated_elapsed();
218
219   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
220
221   int my_proc_id = Actor::self()->getPid();
222   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
223
224   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
225                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
226   if (not TRACE_smpi_view_internals())
227     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
228
229   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230
231   TRACE_smpi_comm_out(my_proc_id);
232
233   log_timed_action(action, clock);
234 }
235
236 static void action_Isend(const char *const *action)
237 {
238   CHECK_ACTION_PARAMS(action, 2, 1)
239   int to = atoi(action[2]);
240   double size=parse_double(action[3]);
241   double clock = smpi_process()->simulated_elapsed();
242
243   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
244
245   int my_proc_id = Actor::self()->getPid();
246   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
247   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
248                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
249   if (not TRACE_smpi_view_internals())
250     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
251
252   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
253
254   TRACE_smpi_comm_out(my_proc_id);
255
256   get_reqq_self()->push_back(request);
257
258   log_timed_action (action, clock);
259 }
260
261 static void action_recv(const char *const *action) {
262   CHECK_ACTION_PARAMS(action, 2, 1)
263   int from = atoi(action[2]);
264   double size=parse_double(action[3]);
265   double clock = smpi_process()->simulated_elapsed();
266   MPI_Status status;
267
268   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
269
270   int my_proc_id = Actor::self()->getPid();
271   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
272
273   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
274                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
275
276   //unknown size from the receiver point of view
277   if (size <= 0.0) {
278     Request::probe(from, 0, MPI_COMM_WORLD, &status);
279     size=status.count;
280   }
281
282   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
283
284   TRACE_smpi_comm_out(my_proc_id);
285   if (not TRACE_smpi_view_internals()) {
286     TRACE_smpi_recv(src_traced, my_proc_id, 0);
287   }
288
289   log_timed_action (action, clock);
290 }
291
292 static void action_Irecv(const char *const *action)
293 {
294   CHECK_ACTION_PARAMS(action, 2, 1)
295   int from = atoi(action[2]);
296   double size=parse_double(action[3]);
297   double clock = smpi_process()->simulated_elapsed();
298
299   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
300
301   int my_proc_id = Actor::self()->getPid();
302   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
303                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
304   MPI_Status status;
305   //unknow size from the receiver pov
306   if (size <= 0.0) {
307     Request::probe(from, 0, MPI_COMM_WORLD, &status);
308     size = status.count;
309   }
310
311   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
312
313   TRACE_smpi_comm_out(my_proc_id);
314   get_reqq_self()->push_back(request);
315
316   log_timed_action (action, clock);
317 }
318
319 static void action_test(const char* const* action)
320 {
321   CHECK_ACTION_PARAMS(action, 0, 0)
322   double clock = smpi_process()->simulated_elapsed();
323   MPI_Status status;
324
325   MPI_Request request = get_reqq_self()->back();
326   get_reqq_self()->pop_back();
327   //if request is null here, this may mean that a previous test has succeeded
328   //Different times in traced application and replayed version may lead to this
329   //In this case, ignore the extra calls.
330   if(request!=nullptr){
331     int my_proc_id = Actor::self()->getPid();
332     TRACE_smpi_testing_in(my_proc_id);
333
334     int flag = Request::test(&request, &status);
335
336     XBT_DEBUG("MPI_Test result: %d", flag);
337     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
338     get_reqq_self()->push_back(request);
339
340     TRACE_smpi_testing_out(my_proc_id);
341   }
342   log_timed_action (action, clock);
343 }
344
345 static void action_wait(const char *const *action){
346   CHECK_ACTION_PARAMS(action, 0, 0)
347   double clock = smpi_process()->simulated_elapsed();
348   MPI_Status status;
349
350   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
351       xbt_str_join_array(action," "));
352   MPI_Request request = get_reqq_self()->back();
353   get_reqq_self()->pop_back();
354
355   if (request==nullptr){
356     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
357     return;
358   }
359
360   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
361
362   MPI_Group group = request->comm()->group();
363   int src_traced = group->rank(request->src());
364   int dst_traced = group->rank(request->dst());
365   int is_wait_for_receive = (request->flags() & RECV);
366   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
367
368   Request::wait(&request, &status);
369
370   TRACE_smpi_comm_out(rank);
371   if (is_wait_for_receive)
372     TRACE_smpi_recv(src_traced, dst_traced, 0);
373   log_timed_action (action, clock);
374 }
375
376 static void action_waitall(const char *const *action){
377   CHECK_ACTION_PARAMS(action, 0, 0)
378   double clock = smpi_process()->simulated_elapsed();
379   const unsigned int count_requests = get_reqq_self()->size();
380
381   if (count_requests>0) {
382     MPI_Status status[count_requests];
383
384     int my_proc_id_traced = Actor::self()->getPid();
385     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
386                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
387     int recvs_snd[count_requests];
388     int recvs_rcv[count_requests];
389     for (unsigned int i = 0; i < count_requests; i++) {
390       const auto& req = (*get_reqq_self())[i];
391       if (req && (req->flags() & RECV)) {
392         recvs_snd[i] = req->src();
393         recvs_rcv[i] = req->dst();
394       } else
395         recvs_snd[i] = -100;
396    }
397    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
398
399    for (unsigned i = 0; i < count_requests; i++) {
400      if (recvs_snd[i]!=-100)
401        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
402    }
403    TRACE_smpi_comm_out(my_proc_id_traced);
404   }
405   log_timed_action (action, clock);
406 }
407
408 static void action_barrier(const char *const *action){
409   double clock = smpi_process()->simulated_elapsed();
410   int my_proc_id = Actor::self()->getPid();
411   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
412
413   Colls::barrier(MPI_COMM_WORLD);
414
415   TRACE_smpi_comm_out(my_proc_id);
416   log_timed_action (action, clock);
417 }
418
419 static void action_bcast(const char *const *action)
420 {
421   CHECK_ACTION_PARAMS(action, 1, 2)
422   double size = parse_double(action[2]);
423   double clock = smpi_process()->simulated_elapsed();
424   int root     = (action[3]) ? atoi(action[3]) : 0;
425   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
426   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
427
428   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
429
430   int my_proc_id = Actor::self()->getPid();
431   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
434
435   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
436
437   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
438
439   TRACE_smpi_comm_out(my_proc_id);
440   log_timed_action (action, clock);
441 }
442
443 static void action_reduce(const char *const *action)
444 {
445   CHECK_ACTION_PARAMS(action, 2, 2)
446   double comm_size = parse_double(action[2]);
447   double comp_size = parse_double(action[3]);
448   double clock = smpi_process()->simulated_elapsed();
449   int root         = (action[4]) ? atoi(action[4]) : 0;
450
451   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
452
453   int my_proc_id = Actor::self()->getPid();
454   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
457
458   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461   smpi_execute_flops(comp_size);
462
463   TRACE_smpi_comm_out(my_proc_id);
464   log_timed_action (action, clock);
465 }
466
467 static void action_allReduce(const char *const *action) {
468   CHECK_ACTION_PARAMS(action, 2, 1)
469   double comm_size = parse_double(action[2]);
470   double comp_size = parse_double(action[3]);
471
472   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
473
474   double clock = smpi_process()->simulated_elapsed();
475   int my_proc_id = Actor::self()->getPid();
476   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
478
479   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482   smpi_execute_flops(comp_size);
483
484   TRACE_smpi_comm_out(my_proc_id);
485   log_timed_action (action, clock);
486 }
487
488 static void action_allToAll(const char *const *action) {
489   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490   double clock = smpi_process()->simulated_elapsed();
491   int comm_size = MPI_COMM_WORLD->size();
492   int send_size = parse_double(action[2]);
493   int recv_size = parse_double(action[3]);
494   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
496
497   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
499
500   int my_proc_id = Actor::self()->getPid();
501   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503                                                     encode_datatype(MPI_CURRENT_TYPE),
504                                                     encode_datatype(MPI_CURRENT_TYPE2)));
505
506   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
507
508   TRACE_smpi_comm_out(my_proc_id);
509   log_timed_action (action, clock);
510 }
511
512 static void action_gather(const char *const *action) {
513   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
514         0 gather 68 68 0 0 0
515       where:
516         1) 68 is the sendcounts
517         2) 68 is the recvcounts
518         3) 0 is the root node
519         4) 0 is the send datatype id, see decode_datatype()
520         5) 0 is the recv datatype id, see decode_datatype()
521   */
522   CHECK_ACTION_PARAMS(action, 2, 3)
523   double clock = smpi_process()->simulated_elapsed();
524   int comm_size = MPI_COMM_WORLD->size();
525   int send_size = parse_double(action[2]);
526   int recv_size = parse_double(action[3]);
527   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
529
530   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531   void *recv = nullptr;
532   int root   = (action[4]) ? atoi(action[4]) : 0;
533   int rank = MPI_COMM_WORLD->rank();
534
535   if(rank==root)
536     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
537
538   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539                                                                         encode_datatype(MPI_CURRENT_TYPE),
540                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
541
542   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
543
544   TRACE_smpi_comm_out(Actor::self()->getPid());
545   log_timed_action (action, clock);
546 }
547
548 static void action_scatter(const char* const* action)
549 {
550   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
551         0 gather 68 68 0 0 0
552       where:
553         1) 68 is the sendcounts
554         2) 68 is the recvcounts
555         3) 0 is the root node
556         4) 0 is the send datatype id, see decode_datatype()
557         5) 0 is the recv datatype id, see decode_datatype()
558   */
559   CHECK_ACTION_PARAMS(action, 2, 3)
560   double clock                   = smpi_process()->simulated_elapsed();
561   int comm_size                  = MPI_COMM_WORLD->size();
562   int send_size                  = parse_double(action[2]);
563   int recv_size                  = parse_double(action[3]);
564   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
566
567   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568   void* recv = nullptr;
569   int root   = (action[4]) ? atoi(action[4]) : 0;
570   int rank = MPI_COMM_WORLD->rank();
571
572   if (rank == root)
573     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
574
575   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576                                                                         encode_datatype(MPI_CURRENT_TYPE),
577                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
578
579   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
580
581   TRACE_smpi_comm_out(Actor::self()->getPid());
582   log_timed_action(action, clock);
583 }
584
585 static void action_gatherv(const char *const *action) {
586   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587        0 gather 68 68 10 10 10 0 0 0
588      where:
589        1) 68 is the sendcount
590        2) 68 10 10 10 is the recvcounts
591        3) 0 is the root node
592        4) 0 is the send datatype id, see decode_datatype()
593        5) 0 is the recv datatype id, see decode_datatype()
594   */
595   double clock = smpi_process()->simulated_elapsed();
596   int comm_size = MPI_COMM_WORLD->size();
597   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598   int send_size = parse_double(action[2]);
599   int disps[comm_size];
600   int recvcounts[comm_size];
601   int recv_sum=0;
602
603   MPI_CURRENT_TYPE =
604       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
605   MPI_Datatype MPI_CURRENT_TYPE2{
606       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
607
608   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609   void *recv = nullptr;
610   for(int i=0;i<comm_size;i++) {
611     recvcounts[i] = atoi(action[i+3]);
612     recv_sum=recv_sum+recvcounts[i];
613     disps[i]=0;
614   }
615
616   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
617   int rank = MPI_COMM_WORLD->rank();
618
619   if(rank==root)
620     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
621
622   std::vector<int>* trace_recvcounts = new std::vector<int>;
623   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
624     trace_recvcounts->push_back(recvcounts[i]);
625
626   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
627                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
628                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
629
630   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
631
632   TRACE_smpi_comm_out(Actor::self()->getPid());
633   log_timed_action (action, clock);
634 }
635
636 static void action_scatterv(const char* const* action)
637 {
638   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
639        0 gather 68 10 10 10 68 0 0 0
640      where:
641        1) 68 10 10 10 is the sendcounts
642        2) 68 is the recvcount
643        3) 0 is the root node
644        4) 0 is the send datatype id, see decode_datatype()
645        5) 0 is the recv datatype id, see decode_datatype()
646   */
647   double clock  = smpi_process()->simulated_elapsed();
648   int comm_size = MPI_COMM_WORLD->size();
649   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
650   int recv_size = parse_double(action[2 + comm_size]);
651   int disps[comm_size];
652   int sendcounts[comm_size];
653   int send_sum = 0;
654
655   MPI_CURRENT_TYPE =
656       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
657   MPI_Datatype MPI_CURRENT_TYPE2{
658       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
659
660   void* send = nullptr;
661   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
662   for (int i = 0; i < comm_size; i++) {
663     sendcounts[i] = atoi(action[i + 2]);
664     send_sum += sendcounts[i];
665     disps[i] = 0;
666   }
667
668   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
669   int rank = MPI_COMM_WORLD->rank();
670
671   if (rank == root)
672     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
673
674   std::vector<int>* trace_sendcounts = new std::vector<int>;
675   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
676     trace_sendcounts->push_back(sendcounts[i]);
677
678   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
679                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
680                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
681
682   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
683
684   TRACE_smpi_comm_out(Actor::self()->getPid());
685   log_timed_action(action, clock);
686 }
687
688 static void action_reducescatter(const char *const *action) {
689  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
690       0 reduceScatter 275427 275427 275427 204020 11346849 0
691     where:
692       1) The first four values after the name of the action declare the recvcounts array
693       2) The value 11346849 is the amount of instructions
694       3) The last value corresponds to the datatype, see decode_datatype().
695 */
696   double clock = smpi_process()->simulated_elapsed();
697   int comm_size = MPI_COMM_WORLD->size();
698   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
699   int comp_size = parse_double(action[2+comm_size]);
700   int recvcounts[comm_size];
701   int my_proc_id                     = Actor::self()->getPid();
702   int size = 0;
703   std::vector<int>* trace_recvcounts = new std::vector<int>;
704   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
705
706   for(int i=0;i<comm_size;i++) {
707     recvcounts[i] = atoi(action[i+2]);
708     trace_recvcounts->push_back(recvcounts[i]);
709     size+=recvcounts[i];
710   }
711
712   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
713                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
714                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
715                                                        encode_datatype(MPI_CURRENT_TYPE)));
716
717   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
718   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
719
720   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
721   smpi_execute_flops(comp_size);
722
723   TRACE_smpi_comm_out(my_proc_id);
724   log_timed_action (action, clock);
725 }
726
727 static void action_allgather(const char *const *action) {
728   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
729         0 allGather 275427 275427
730     where:
731         1) 275427 is the sendcount
732         2) 275427 is the recvcount
733         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
734   */
735   double clock = smpi_process()->simulated_elapsed();
736
737   CHECK_ACTION_PARAMS(action, 2, 2)
738   int sendcount=atoi(action[2]);
739   int recvcount=atoi(action[3]);
740
741   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
742   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
743
744   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
745   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
746
747   int my_proc_id = Actor::self()->getPid();
748
749   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
750                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
751                                                     encode_datatype(MPI_CURRENT_TYPE),
752                                                     encode_datatype(MPI_CURRENT_TYPE2)));
753
754   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
755
756   TRACE_smpi_comm_out(my_proc_id);
757   log_timed_action (action, clock);
758 }
759
760 static void action_allgatherv(const char *const *action) {
761   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
762         0 allGatherV 275427 275427 275427 275427 204020
763      where:
764         1) 275427 is the sendcount
765         2) The next four elements declare the recvcounts array
766         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
767   */
768   double clock = smpi_process()->simulated_elapsed();
769
770   int comm_size = MPI_COMM_WORLD->size();
771   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
772   int sendcount=atoi(action[2]);
773   int recvcounts[comm_size];
774   int disps[comm_size];
775   int recv_sum=0;
776
777   MPI_CURRENT_TYPE =
778       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
779   MPI_Datatype MPI_CURRENT_TYPE2{
780       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
781
782   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
783
784   for(int i=0;i<comm_size;i++) {
785     recvcounts[i] = atoi(action[i+3]);
786     recv_sum=recv_sum+recvcounts[i];
787     disps[i] = 0;
788   }
789   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
790
791   int my_proc_id = Actor::self()->getPid();
792
793   std::vector<int>* trace_recvcounts = new std::vector<int>;
794   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
795     trace_recvcounts->push_back(recvcounts[i]);
796
797   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
798                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
799                                                        encode_datatype(MPI_CURRENT_TYPE),
800                                                        encode_datatype(MPI_CURRENT_TYPE2)));
801
802   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
803                           MPI_COMM_WORLD);
804
805   TRACE_smpi_comm_out(my_proc_id);
806   log_timed_action (action, clock);
807 }
808
809 static void action_allToAllv(const char *const *action) {
810   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
811         0 allToAllV 100 1 7 10 12 100 1 70 10 5
812      where:
813         1) 100 is the size of the send buffer *sizeof(int),
814         2) 1 7 10 12 is the sendcounts array
815         3) 100*sizeof(int) is the size of the receiver buffer
816         4)  1 70 10 5 is the recvcounts array
817   */
818   double clock = smpi_process()->simulated_elapsed();
819
820   int comm_size = MPI_COMM_WORLD->size();
821   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
822   int send_size = 0;
823   int recv_size = 0;
824   int sendcounts[comm_size];
825   std::vector<int>* trace_sendcounts = new std::vector<int>;
826   int recvcounts[comm_size];
827   std::vector<int>* trace_recvcounts = new std::vector<int>;
828   int senddisps[comm_size];
829   int recvdisps[comm_size];
830
831   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
832                          ? decode_datatype(action[4 + 2 * comm_size])
833                          : MPI_DEFAULT_TYPE;
834   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
835                                      ? decode_datatype(action[5 + 2 * comm_size])
836                                      : MPI_DEFAULT_TYPE};
837
838   int send_buf_size=parse_double(action[2]);
839   int recv_buf_size=parse_double(action[3+comm_size]);
840   int my_proc_id = Actor::self()->getPid();
841   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
842   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
843
844   for(int i=0;i<comm_size;i++) {
845     sendcounts[i] = atoi(action[i+3]);
846     trace_sendcounts->push_back(sendcounts[i]);
847     send_size += sendcounts[i];
848     recvcounts[i] = atoi(action[i+4+comm_size]);
849     trace_recvcounts->push_back(recvcounts[i]);
850     recv_size += recvcounts[i];
851     senddisps[i] = 0;
852     recvdisps[i] = 0;
853   }
854
855   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
856                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
857                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
858                                                        encode_datatype(MPI_CURRENT_TYPE2)));
859
860   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
861                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
862
863   TRACE_smpi_comm_out(my_proc_id);
864   log_timed_action (action, clock);
865 }
866
867 }} // namespace simgrid::smpi
868
869 /** @brief Only initialize the replay, don't do it for real */
870 void smpi_replay_init(int* argc, char*** argv)
871 {
872   simgrid::smpi::Process::init(argc, argv);
873   smpi_process()->mark_as_initialized();
874   smpi_process()->set_replaying(true);
875
876   int my_proc_id = Actor::self()->getPid();
877   TRACE_smpi_init(my_proc_id);
878   TRACE_smpi_computing_init(my_proc_id);
879   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
880   TRACE_smpi_comm_out(my_proc_id);
881   xbt_replay_action_register("init",       simgrid::smpi::action_init);
882   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
883   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
884   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
885   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
886   xbt_replay_action_register("send",       simgrid::smpi::action_send);
887   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
888   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
889   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
890   xbt_replay_action_register("test",       simgrid::smpi::action_test);
891   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
892   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
893   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
894   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
895   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
896   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
897   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
898   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
899   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
900   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
901   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
902   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
903   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
904   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
905   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
906   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
907
908   //if we have a delayed start, sleep here.
909   if(*argc>2){
910     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
911     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
912     smpi_execute_flops(value);
913   } else {
914     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
915     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
916     smpi_execute_flops(0.0);
917   }
918 }
919
920 /** @brief actually run the replay after initialization */
921 void smpi_replay_main(int* argc, char*** argv)
922 {
923   simgrid::xbt::replay_runner(*argc, *argv);
924
925   /* and now, finalize everything */
926   /* One active process will stop. Decrease the counter*/
927   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
928   if (not get_reqq_self()->empty()) {
929     unsigned int count_requests=get_reqq_self()->size();
930     MPI_Request requests[count_requests];
931     MPI_Status status[count_requests];
932     unsigned int i=0;
933
934     for (auto const& req : *get_reqq_self()) {
935       requests[i] = req;
936       i++;
937     }
938     simgrid::smpi::Request::waitall(count_requests, requests, status);
939   }
940   delete get_reqq_self();
941   active_processes--;
942
943   if(active_processes==0){
944     /* Last process alive speaking: end the simulated timer */
945     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
946     xbt_free(sendbuffer);
947     xbt_free(recvbuffer);
948   }
949
950   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
951
952   smpi_process()->finalize();
953
954   TRACE_smpi_comm_out(Actor::self()->getPid());
955   TRACE_smpi_finalize(Actor::self()->getPid());
956 }
957
958 /** @brief chain a replay initialization and a replay start */
959 void smpi_replay_run(int* argc, char*** argv)
960 {
961   smpi_replay_init(argc, argv);
962   smpi_replay_main(argc, argv);
963 }