Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Use std::vector copy-constr. instead of loop
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
27
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       MPI_CURRENT_TYPE=MPI_DOUBLE;
99       break;
100     case 1:
101       MPI_CURRENT_TYPE=MPI_INT;
102       break;
103     case 2:
104       MPI_CURRENT_TYPE=MPI_CHAR;
105       break;
106     case 3:
107       MPI_CURRENT_TYPE=MPI_SHORT;
108       break;
109     case 4:
110       MPI_CURRENT_TYPE=MPI_LONG;
111       break;
112     case 5:
113       MPI_CURRENT_TYPE=MPI_FLOAT;
114       break;
115     case 6:
116       MPI_CURRENT_TYPE=MPI_BYTE;
117       break;
118     default:
119       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120       break;
121   }
122    return MPI_CURRENT_TYPE;
123 }
124
125 const char* encode_datatype(MPI_Datatype datatype)
126 {
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   // default - not implemented.
142   // do not warn here as we pass in this function even for other trace formats
143   return "-1";
144 }
145
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147     int i=0;\
148     while(action[i]!=nullptr)\
149      i++;\
150     if(i<mandatory+2)                                           \
151     THROWF(arg_error, 0, "%s replay failed.\n" \
152           "%d items were given on the line. First two should be process_id and action.  " \
153           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
155   }
156
157 namespace simgrid {
158 namespace smpi {
159
160 static void action_init(const char *const *action)
161 {
162   XBT_DEBUG("Initialize the counters");
163   CHECK_ACTION_PARAMS(action, 0, 1)
164   if(action[2])
165     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166   else
167     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168
169   /* start a simulated timer */
170   smpi_process()->simulated_start();
171   /*initialize the number of active processes */
172   active_processes = smpi_process_count();
173
174   set_reqq_self(new std::vector<MPI_Request>);
175 }
176
177 static void action_finalize(const char *const *action)
178 {
179   /* Nothing to do */
180 }
181
182 static void action_comm_size(const char *const *action)
183 {
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_comm_dup(const char *const *action)
194 {
195   log_timed_action (action, smpi_process()->simulated_elapsed());
196 }
197
198 static void action_compute(const char *const *action)
199 {
200   CHECK_ACTION_PARAMS(action, 1, 0)
201   double clock = smpi_process()->simulated_elapsed();
202   double flops= parse_double(action[2]);
203   int my_proc_id = Actor::self()->getPid();
204
205   TRACE_smpi_computing_in(my_proc_id, flops);
206   smpi_execute_flops(flops);
207   TRACE_smpi_computing_out(my_proc_id);
208
209   log_timed_action (action, clock);
210 }
211
212 static void action_send(const char *const *action)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 1)
215   int to = atoi(action[2]);
216   double size=parse_double(action[3]);
217   double clock = smpi_process()->simulated_elapsed();
218
219   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
220
221   int my_proc_id = Actor::self()->getPid();
222   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
223
224   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
225                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
226   if (not TRACE_smpi_view_internals())
227     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
228
229   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230
231   TRACE_smpi_comm_out(my_proc_id);
232
233   log_timed_action(action, clock);
234 }
235
236 static void action_Isend(const char *const *action)
237 {
238   CHECK_ACTION_PARAMS(action, 2, 1)
239   int to = atoi(action[2]);
240   double size=parse_double(action[3]);
241   double clock = smpi_process()->simulated_elapsed();
242
243   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
244
245   int my_proc_id = Actor::self()->getPid();
246   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
247   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
248                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
249   if (not TRACE_smpi_view_internals())
250     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
251
252   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
253
254   TRACE_smpi_comm_out(my_proc_id);
255
256   get_reqq_self()->push_back(request);
257
258   log_timed_action (action, clock);
259 }
260
261 static void action_recv(const char *const *action) {
262   CHECK_ACTION_PARAMS(action, 2, 1)
263   int from = atoi(action[2]);
264   double size=parse_double(action[3]);
265   double clock = smpi_process()->simulated_elapsed();
266   MPI_Status status;
267
268   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
269
270   int my_proc_id = Actor::self()->getPid();
271   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
272
273   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
274                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
275
276   //unknown size from the receiver point of view
277   if (size <= 0.0) {
278     Request::probe(from, 0, MPI_COMM_WORLD, &status);
279     size=status.count;
280   }
281
282   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
283
284   TRACE_smpi_comm_out(my_proc_id);
285   if (not TRACE_smpi_view_internals()) {
286     TRACE_smpi_recv(src_traced, my_proc_id, 0);
287   }
288
289   log_timed_action (action, clock);
290 }
291
292 static void action_Irecv(const char *const *action)
293 {
294   CHECK_ACTION_PARAMS(action, 2, 1)
295   int from = atoi(action[2]);
296   double size=parse_double(action[3]);
297   double clock = smpi_process()->simulated_elapsed();
298
299   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
300
301   int my_proc_id = Actor::self()->getPid();
302   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
303                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
304   MPI_Status status;
305   //unknow size from the receiver pov
306   if (size <= 0.0) {
307     Request::probe(from, 0, MPI_COMM_WORLD, &status);
308     size = status.count;
309   }
310
311   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
312
313   TRACE_smpi_comm_out(my_proc_id);
314   get_reqq_self()->push_back(request);
315
316   log_timed_action (action, clock);
317 }
318
319 static void action_test(const char* const* action)
320 {
321   CHECK_ACTION_PARAMS(action, 0, 0)
322   double clock = smpi_process()->simulated_elapsed();
323   MPI_Status status;
324
325   MPI_Request request = get_reqq_self()->back();
326   get_reqq_self()->pop_back();
327   //if request is null here, this may mean that a previous test has succeeded
328   //Different times in traced application and replayed version may lead to this
329   //In this case, ignore the extra calls.
330   if(request!=nullptr){
331     int my_proc_id = Actor::self()->getPid();
332     TRACE_smpi_testing_in(my_proc_id);
333
334     int flag = Request::test(&request, &status);
335
336     XBT_DEBUG("MPI_Test result: %d", flag);
337     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
338     get_reqq_self()->push_back(request);
339
340     TRACE_smpi_testing_out(my_proc_id);
341   }
342   log_timed_action (action, clock);
343 }
344
345 static void action_wait(const char *const *action){
346   CHECK_ACTION_PARAMS(action, 0, 0)
347   double clock = smpi_process()->simulated_elapsed();
348   MPI_Status status;
349
350   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
351       xbt_str_join_array(action," "));
352   MPI_Request request = get_reqq_self()->back();
353   get_reqq_self()->pop_back();
354
355   if (request==nullptr){
356     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
357     return;
358   }
359
360   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
361
362   MPI_Group group = request->comm()->group();
363   int src_traced = group->rank(request->src());
364   int dst_traced = group->rank(request->dst());
365   int is_wait_for_receive = (request->flags() & RECV);
366   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
367
368   Request::wait(&request, &status);
369
370   TRACE_smpi_comm_out(rank);
371   if (is_wait_for_receive)
372     TRACE_smpi_recv(src_traced, dst_traced, 0);
373   log_timed_action (action, clock);
374 }
375
376 static void action_waitall(const char *const *action){
377   CHECK_ACTION_PARAMS(action, 0, 0)
378   double clock = smpi_process()->simulated_elapsed();
379   const unsigned int count_requests = get_reqq_self()->size();
380
381   if (count_requests>0) {
382     MPI_Status status[count_requests];
383
384     int my_proc_id_traced = Actor::self()->getPid();
385     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
386                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
387     int recvs_snd[count_requests];
388     int recvs_rcv[count_requests];
389     for (unsigned int i = 0; i < count_requests; i++) {
390       const auto& req = (*get_reqq_self())[i];
391       if (req && (req->flags() & RECV)) {
392         recvs_snd[i] = req->src();
393         recvs_rcv[i] = req->dst();
394       } else
395         recvs_snd[i] = -100;
396    }
397    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
398
399    for (unsigned i = 0; i < count_requests; i++) {
400      if (recvs_snd[i]!=-100)
401        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
402    }
403    TRACE_smpi_comm_out(my_proc_id_traced);
404   }
405   log_timed_action (action, clock);
406 }
407
408 static void action_barrier(const char *const *action){
409   double clock = smpi_process()->simulated_elapsed();
410   int my_proc_id = Actor::self()->getPid();
411   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
412
413   Colls::barrier(MPI_COMM_WORLD);
414
415   TRACE_smpi_comm_out(my_proc_id);
416   log_timed_action (action, clock);
417 }
418
419 static void action_bcast(const char *const *action)
420 {
421   CHECK_ACTION_PARAMS(action, 1, 2)
422   double size = parse_double(action[2]);
423   double clock = smpi_process()->simulated_elapsed();
424   int root     = (action[3]) ? atoi(action[3]) : 0;
425   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
426   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
427
428   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
429
430   int my_proc_id = Actor::self()->getPid();
431   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
434
435   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
436
437   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
438
439   TRACE_smpi_comm_out(my_proc_id);
440   log_timed_action (action, clock);
441 }
442
443 static void action_reduce(const char *const *action)
444 {
445   CHECK_ACTION_PARAMS(action, 2, 2)
446   double comm_size = parse_double(action[2]);
447   double comp_size = parse_double(action[3]);
448   double clock = smpi_process()->simulated_elapsed();
449   int root         = (action[4]) ? atoi(action[4]) : 0;
450
451   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
452
453   int my_proc_id = Actor::self()->getPid();
454   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
457
458   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461   smpi_execute_flops(comp_size);
462
463   TRACE_smpi_comm_out(my_proc_id);
464   log_timed_action (action, clock);
465 }
466
467 static void action_allReduce(const char *const *action) {
468   CHECK_ACTION_PARAMS(action, 2, 1)
469   double comm_size = parse_double(action[2]);
470   double comp_size = parse_double(action[3]);
471
472   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
473
474   double clock = smpi_process()->simulated_elapsed();
475   int my_proc_id = Actor::self()->getPid();
476   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
478
479   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482   smpi_execute_flops(comp_size);
483
484   TRACE_smpi_comm_out(my_proc_id);
485   log_timed_action (action, clock);
486 }
487
488 static void action_allToAll(const char *const *action) {
489   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490   double clock = smpi_process()->simulated_elapsed();
491   int comm_size = MPI_COMM_WORLD->size();
492   int send_size = parse_double(action[2]);
493   int recv_size = parse_double(action[3]);
494   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
496
497   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
499
500   int my_proc_id = Actor::self()->getPid();
501   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503                                                     encode_datatype(MPI_CURRENT_TYPE),
504                                                     encode_datatype(MPI_CURRENT_TYPE2)));
505
506   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
507
508   TRACE_smpi_comm_out(my_proc_id);
509   log_timed_action (action, clock);
510 }
511
512 static void action_gather(const char *const *action) {
513   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
514         0 gather 68 68 0 0 0
515       where:
516         1) 68 is the sendcounts
517         2) 68 is the recvcounts
518         3) 0 is the root node
519         4) 0 is the send datatype id, see decode_datatype()
520         5) 0 is the recv datatype id, see decode_datatype()
521   */
522   CHECK_ACTION_PARAMS(action, 2, 3)
523   double clock = smpi_process()->simulated_elapsed();
524   int comm_size = MPI_COMM_WORLD->size();
525   int send_size = parse_double(action[2]);
526   int recv_size = parse_double(action[3]);
527   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
529
530   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531   void *recv = nullptr;
532   int root   = (action[4]) ? atoi(action[4]) : 0;
533   int rank = MPI_COMM_WORLD->rank();
534
535   if(rank==root)
536     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
537
538   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539                                                                         encode_datatype(MPI_CURRENT_TYPE),
540                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
541
542   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
543
544   TRACE_smpi_comm_out(Actor::self()->getPid());
545   log_timed_action (action, clock);
546 }
547
548 static void action_scatter(const char* const* action)
549 {
550   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
551         0 gather 68 68 0 0 0
552       where:
553         1) 68 is the sendcounts
554         2) 68 is the recvcounts
555         3) 0 is the root node
556         4) 0 is the send datatype id, see decode_datatype()
557         5) 0 is the recv datatype id, see decode_datatype()
558   */
559   CHECK_ACTION_PARAMS(action, 2, 3)
560   double clock                   = smpi_process()->simulated_elapsed();
561   int comm_size                  = MPI_COMM_WORLD->size();
562   int send_size                  = parse_double(action[2]);
563   int recv_size                  = parse_double(action[3]);
564   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
566
567   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568   void* recv = nullptr;
569   int root   = (action[4]) ? atoi(action[4]) : 0;
570   int rank = MPI_COMM_WORLD->rank();
571
572   if (rank == root)
573     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
574
575   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576                                                                         encode_datatype(MPI_CURRENT_TYPE),
577                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
578
579   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
580
581   TRACE_smpi_comm_out(Actor::self()->getPid());
582   log_timed_action(action, clock);
583 }
584
585 static void action_gatherv(const char *const *action) {
586   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587        0 gather 68 68 10 10 10 0 0 0
588      where:
589        1) 68 is the sendcount
590        2) 68 10 10 10 is the recvcounts
591        3) 0 is the root node
592        4) 0 is the send datatype id, see decode_datatype()
593        5) 0 is the recv datatype id, see decode_datatype()
594   */
595   double clock = smpi_process()->simulated_elapsed();
596   int comm_size = MPI_COMM_WORLD->size();
597   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598   int send_size = parse_double(action[2]);
599   int disps[comm_size];
600   int recvcounts[comm_size];
601   int recv_sum=0;
602
603   MPI_CURRENT_TYPE =
604       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
605   MPI_Datatype MPI_CURRENT_TYPE2{
606       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
607
608   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609   void *recv = nullptr;
610   for(int i=0;i<comm_size;i++) {
611     recvcounts[i] = atoi(action[i+3]);
612     recv_sum=recv_sum+recvcounts[i];
613     disps[i]=0;
614   }
615
616   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
617   int rank = MPI_COMM_WORLD->rank();
618
619   if(rank==root)
620     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
621
622   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
623
624   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
625                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
626                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
627
628   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
629
630   TRACE_smpi_comm_out(Actor::self()->getPid());
631   log_timed_action (action, clock);
632 }
633
634 static void action_scatterv(const char* const* action)
635 {
636   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
637        0 gather 68 10 10 10 68 0 0 0
638      where:
639        1) 68 10 10 10 is the sendcounts
640        2) 68 is the recvcount
641        3) 0 is the root node
642        4) 0 is the send datatype id, see decode_datatype()
643        5) 0 is the recv datatype id, see decode_datatype()
644   */
645   double clock  = smpi_process()->simulated_elapsed();
646   int comm_size = MPI_COMM_WORLD->size();
647   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
648   int recv_size = parse_double(action[2 + comm_size]);
649   int disps[comm_size];
650   int sendcounts[comm_size];
651   int send_sum = 0;
652
653   MPI_CURRENT_TYPE =
654       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
655   MPI_Datatype MPI_CURRENT_TYPE2{
656       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
657
658   void* send = nullptr;
659   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
660   for (int i = 0; i < comm_size; i++) {
661     sendcounts[i] = atoi(action[i + 2]);
662     send_sum += sendcounts[i];
663     disps[i] = 0;
664   }
665
666   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
667   int rank = MPI_COMM_WORLD->rank();
668
669   if (rank == root)
670     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
671
672   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
673
674   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
675                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
676                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
677
678   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
679
680   TRACE_smpi_comm_out(Actor::self()->getPid());
681   log_timed_action(action, clock);
682 }
683
684 static void action_reducescatter(const char *const *action) {
685  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
686       0 reduceScatter 275427 275427 275427 204020 11346849 0
687     where:
688       1) The first four values after the name of the action declare the recvcounts array
689       2) The value 11346849 is the amount of instructions
690       3) The last value corresponds to the datatype, see decode_datatype().
691 */
692   double clock = smpi_process()->simulated_elapsed();
693   int comm_size = MPI_COMM_WORLD->size();
694   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
695   int comp_size = parse_double(action[2+comm_size]);
696   int recvcounts[comm_size];
697   int my_proc_id                     = Actor::self()->getPid();
698   int size = 0;
699   std::vector<int>* trace_recvcounts = new std::vector<int>;
700   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
701
702   for(int i=0;i<comm_size;i++) {
703     recvcounts[i] = atoi(action[i+2]);
704     trace_recvcounts->push_back(recvcounts[i]);
705     size+=recvcounts[i];
706   }
707
708   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
709                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
710                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
711                                                        encode_datatype(MPI_CURRENT_TYPE)));
712
713   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
714   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
715
716   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
717   smpi_execute_flops(comp_size);
718
719   TRACE_smpi_comm_out(my_proc_id);
720   log_timed_action (action, clock);
721 }
722
723 static void action_allgather(const char *const *action) {
724   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
725         0 allGather 275427 275427
726     where:
727         1) 275427 is the sendcount
728         2) 275427 is the recvcount
729         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
730   */
731   double clock = smpi_process()->simulated_elapsed();
732
733   CHECK_ACTION_PARAMS(action, 2, 2)
734   int sendcount=atoi(action[2]);
735   int recvcount=atoi(action[3]);
736
737   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
738   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
739
740   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
741   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
742
743   int my_proc_id = Actor::self()->getPid();
744
745   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
746                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
747                                                     encode_datatype(MPI_CURRENT_TYPE),
748                                                     encode_datatype(MPI_CURRENT_TYPE2)));
749
750   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
751
752   TRACE_smpi_comm_out(my_proc_id);
753   log_timed_action (action, clock);
754 }
755
756 static void action_allgatherv(const char *const *action) {
757   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
758         0 allGatherV 275427 275427 275427 275427 204020
759      where:
760         1) 275427 is the sendcount
761         2) The next four elements declare the recvcounts array
762         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
763   */
764   double clock = smpi_process()->simulated_elapsed();
765
766   int comm_size = MPI_COMM_WORLD->size();
767   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
768   int sendcount=atoi(action[2]);
769   int recvcounts[comm_size];
770   int disps[comm_size];
771   int recv_sum=0;
772
773   MPI_CURRENT_TYPE =
774       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
775   MPI_Datatype MPI_CURRENT_TYPE2{
776       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
777
778   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
779
780   for(int i=0;i<comm_size;i++) {
781     recvcounts[i] = atoi(action[i+3]);
782     recv_sum=recv_sum+recvcounts[i];
783     disps[i] = 0;
784   }
785   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
786
787   int my_proc_id = Actor::self()->getPid();
788
789   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
790
791   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
792                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
793                                                        encode_datatype(MPI_CURRENT_TYPE),
794                                                        encode_datatype(MPI_CURRENT_TYPE2)));
795
796   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
797                           MPI_COMM_WORLD);
798
799   TRACE_smpi_comm_out(my_proc_id);
800   log_timed_action (action, clock);
801 }
802
803 static void action_allToAllv(const char *const *action) {
804   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
805         0 allToAllV 100 1 7 10 12 100 1 70 10 5
806      where:
807         1) 100 is the size of the send buffer *sizeof(int),
808         2) 1 7 10 12 is the sendcounts array
809         3) 100*sizeof(int) is the size of the receiver buffer
810         4)  1 70 10 5 is the recvcounts array
811   */
812   double clock = smpi_process()->simulated_elapsed();
813
814   int comm_size = MPI_COMM_WORLD->size();
815   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
816   int send_size = 0;
817   int recv_size = 0;
818   int sendcounts[comm_size];
819   std::vector<int>* trace_sendcounts = new std::vector<int>;
820   int recvcounts[comm_size];
821   std::vector<int>* trace_recvcounts = new std::vector<int>;
822   int senddisps[comm_size];
823   int recvdisps[comm_size];
824
825   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
826                          ? decode_datatype(action[4 + 2 * comm_size])
827                          : MPI_DEFAULT_TYPE;
828   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
829                                      ? decode_datatype(action[5 + 2 * comm_size])
830                                      : MPI_DEFAULT_TYPE};
831
832   int send_buf_size=parse_double(action[2]);
833   int recv_buf_size=parse_double(action[3+comm_size]);
834   int my_proc_id = Actor::self()->getPid();
835   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
836   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
837
838   for(int i=0;i<comm_size;i++) {
839     sendcounts[i] = atoi(action[i+3]);
840     trace_sendcounts->push_back(sendcounts[i]);
841     send_size += sendcounts[i];
842     recvcounts[i] = atoi(action[i+4+comm_size]);
843     trace_recvcounts->push_back(recvcounts[i]);
844     recv_size += recvcounts[i];
845     senddisps[i] = 0;
846     recvdisps[i] = 0;
847   }
848
849   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
850                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
851                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
852                                                        encode_datatype(MPI_CURRENT_TYPE2)));
853
854   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
855                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
856
857   TRACE_smpi_comm_out(my_proc_id);
858   log_timed_action (action, clock);
859 }
860
861 }} // namespace simgrid::smpi
862
863 /** @brief Only initialize the replay, don't do it for real */
864 void smpi_replay_init(int* argc, char*** argv)
865 {
866   simgrid::smpi::Process::init(argc, argv);
867   smpi_process()->mark_as_initialized();
868   smpi_process()->set_replaying(true);
869
870   int my_proc_id = Actor::self()->getPid();
871   TRACE_smpi_init(my_proc_id);
872   TRACE_smpi_computing_init(my_proc_id);
873   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
874   TRACE_smpi_comm_out(my_proc_id);
875   xbt_replay_action_register("init",       simgrid::smpi::action_init);
876   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
877   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
878   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
879   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
880   xbt_replay_action_register("send",       simgrid::smpi::action_send);
881   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
882   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
883   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
884   xbt_replay_action_register("test",       simgrid::smpi::action_test);
885   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
886   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
887   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
888   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
889   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
890   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
891   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
892   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
893   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
894   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
895   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
896   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
897   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
898   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
899   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
900   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
901
902   //if we have a delayed start, sleep here.
903   if(*argc>2){
904     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
905     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
906     smpi_execute_flops(value);
907   } else {
908     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
909     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
910     smpi_execute_flops(0.0);
911   }
912 }
913
914 /** @brief actually run the replay after initialization */
915 void smpi_replay_main(int* argc, char*** argv)
916 {
917   simgrid::xbt::replay_runner(*argc, *argv);
918
919   /* and now, finalize everything */
920   /* One active process will stop. Decrease the counter*/
921   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
922   if (not get_reqq_self()->empty()) {
923     unsigned int count_requests=get_reqq_self()->size();
924     MPI_Request requests[count_requests];
925     MPI_Status status[count_requests];
926     unsigned int i=0;
927
928     for (auto const& req : *get_reqq_self()) {
929       requests[i] = req;
930       i++;
931     }
932     simgrid::smpi::Request::waitall(count_requests, requests, status);
933   }
934   delete get_reqq_self();
935   active_processes--;
936
937   if(active_processes==0){
938     /* Last process alive speaking: end the simulated timer */
939     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
940     xbt_free(sendbuffer);
941     xbt_free(recvbuffer);
942   }
943
944   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
945
946   smpi_process()->finalize();
947
948   TRACE_smpi_comm_out(Actor::self()->getPid());
949   TRACE_smpi_finalize(Actor::self()->getPid());
950 }
951
952 /** @brief chain a replay initialization and a replay start */
953 void smpi_replay_run(int* argc, char*** argv)
954 {
955   smpi_replay_init(argc, argv);
956   smpi_replay_main(argc, argv);
957 }