Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Rename variables from 'rank' to 'my_proc_id' in smpi_replay.cpp
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
27
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       MPI_CURRENT_TYPE=MPI_DOUBLE;
99       break;
100     case 1:
101       MPI_CURRENT_TYPE=MPI_INT;
102       break;
103     case 2:
104       MPI_CURRENT_TYPE=MPI_CHAR;
105       break;
106     case 3:
107       MPI_CURRENT_TYPE=MPI_SHORT;
108       break;
109     case 4:
110       MPI_CURRENT_TYPE=MPI_LONG;
111       break;
112     case 5:
113       MPI_CURRENT_TYPE=MPI_FLOAT;
114       break;
115     case 6:
116       MPI_CURRENT_TYPE=MPI_BYTE;
117       break;
118     default:
119       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120       break;
121   }
122    return MPI_CURRENT_TYPE;
123 }
124
125 const char* encode_datatype(MPI_Datatype datatype)
126 {
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   // default - not implemented.
142   // do not warn here as we pass in this function even for other trace formats
143   return "-1";
144 }
145
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147     int i=0;\
148     while(action[i]!=nullptr)\
149      i++;\
150     if(i<mandatory+2)                                           \
151     THROWF(arg_error, 0, "%s replay failed.\n" \
152           "%d items were given on the line. First two should be process_id and action.  " \
153           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
155   }
156
157 namespace simgrid {
158 namespace smpi {
159
160 static void action_init(const char *const *action)
161 {
162   XBT_DEBUG("Initialize the counters");
163   CHECK_ACTION_PARAMS(action, 0, 1)
164   if(action[2])
165     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166   else
167     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168
169   /* start a simulated timer */
170   smpi_process()->simulated_start();
171   /*initialize the number of active processes */
172   active_processes = smpi_process_count();
173
174   set_reqq_self(new std::vector<MPI_Request>);
175 }
176
177 static void action_finalize(const char *const *action)
178 {
179   /* Nothing to do */
180 }
181
182 static void action_comm_size(const char *const *action)
183 {
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_comm_dup(const char *const *action)
194 {
195   log_timed_action (action, smpi_process()->simulated_elapsed());
196 }
197
198 static void action_compute(const char *const *action)
199 {
200   CHECK_ACTION_PARAMS(action, 1, 0)
201   double clock = smpi_process()->simulated_elapsed();
202   double flops= parse_double(action[2]);
203   int my_proc_id = Actor::self()->getPid();
204
205   TRACE_smpi_computing_in(my_proc_id, flops);
206   smpi_execute_flops(flops);
207   TRACE_smpi_computing_out(my_proc_id);
208
209   log_timed_action (action, clock);
210 }
211
212 static void action_send(const char *const *action)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 1)
215   int to = atoi(action[2]);
216   double size=parse_double(action[3]);
217   double clock = smpi_process()->simulated_elapsed();
218
219   if(action[4])
220     MPI_CURRENT_TYPE=decode_datatype(action[4]);
221   else
222     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
223
224   int my_proc_id = Actor::self()->getPid();
225   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
226
227   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
228                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
229   if (not TRACE_smpi_view_internals())
230     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
231
232   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233
234   TRACE_smpi_comm_out(my_proc_id);
235
236   log_timed_action(action, clock);
237 }
238
239 static void action_Isend(const char *const *action)
240 {
241   CHECK_ACTION_PARAMS(action, 2, 1)
242   int to = atoi(action[2]);
243   double size=parse_double(action[3]);
244   double clock = smpi_process()->simulated_elapsed();
245
246   if(action[4])
247     MPI_CURRENT_TYPE=decode_datatype(action[4]);
248   else
249     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250
251   int my_proc_id = Actor::self()->getPid();
252   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
253   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
254                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
255   if (not TRACE_smpi_view_internals())
256     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
257
258   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
259
260   TRACE_smpi_comm_out(my_proc_id);
261
262   get_reqq_self()->push_back(request);
263
264   log_timed_action (action, clock);
265 }
266
267 static void action_recv(const char *const *action) {
268   CHECK_ACTION_PARAMS(action, 2, 1)
269   int from = atoi(action[2]);
270   double size=parse_double(action[3]);
271   double clock = smpi_process()->simulated_elapsed();
272   MPI_Status status;
273
274   if(action[4])
275     MPI_CURRENT_TYPE=decode_datatype(action[4]);
276   else
277     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279   int my_proc_id = Actor::self()->getPid();
280   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
281
282   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
283                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
284
285   //unknown size from the receiver point of view
286   if (size <= 0.0) {
287     Request::probe(from, 0, MPI_COMM_WORLD, &status);
288     size=status.count;
289   }
290
291   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
292
293   TRACE_smpi_comm_out(my_proc_id);
294   if (not TRACE_smpi_view_internals()) {
295     TRACE_smpi_recv(src_traced, my_proc_id, 0);
296   }
297
298   log_timed_action (action, clock);
299 }
300
301 static void action_Irecv(const char *const *action)
302 {
303   CHECK_ACTION_PARAMS(action, 2, 1)
304   int from = atoi(action[2]);
305   double size=parse_double(action[3]);
306   double clock = smpi_process()->simulated_elapsed();
307
308   if(action[4])
309     MPI_CURRENT_TYPE=decode_datatype(action[4]);
310   else
311     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
312
313   int my_proc_id = Actor::self()->getPid();
314   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
315                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
316   MPI_Status status;
317   //unknow size from the receiver pov
318   if (size <= 0.0) {
319     Request::probe(from, 0, MPI_COMM_WORLD, &status);
320     size = status.count;
321   }
322
323   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
324
325   TRACE_smpi_comm_out(my_proc_id);
326   get_reqq_self()->push_back(request);
327
328   log_timed_action (action, clock);
329 }
330
331 static void action_test(const char* const* action)
332 {
333   CHECK_ACTION_PARAMS(action, 0, 0)
334   double clock = smpi_process()->simulated_elapsed();
335   MPI_Status status;
336
337   MPI_Request request = get_reqq_self()->back();
338   get_reqq_self()->pop_back();
339   //if request is null here, this may mean that a previous test has succeeded
340   //Different times in traced application and replayed version may lead to this
341   //In this case, ignore the extra calls.
342   if(request!=nullptr){
343     int my_proc_id = Actor::self()->getPid();
344     TRACE_smpi_testing_in(my_proc_id);
345
346     int flag = Request::test(&request, &status);
347
348     XBT_DEBUG("MPI_Test result: %d", flag);
349     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
350     get_reqq_self()->push_back(request);
351
352     TRACE_smpi_testing_out(my_proc_id);
353   }
354   log_timed_action (action, clock);
355 }
356
357 static void action_wait(const char *const *action){
358   CHECK_ACTION_PARAMS(action, 0, 0)
359   double clock = smpi_process()->simulated_elapsed();
360   MPI_Status status;
361
362   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
363       xbt_str_join_array(action," "));
364   MPI_Request request = get_reqq_self()->back();
365   get_reqq_self()->pop_back();
366
367   if (request==nullptr){
368     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
369     return;
370   }
371
372   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
373
374   MPI_Group group = request->comm()->group();
375   int src_traced = group->rank(request->src());
376   int dst_traced = group->rank(request->dst());
377   int is_wait_for_receive = (request->flags() & RECV);
378   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
379
380   Request::wait(&request, &status);
381
382   TRACE_smpi_comm_out(rank);
383   if (is_wait_for_receive)
384     TRACE_smpi_recv(src_traced, dst_traced, 0);
385   log_timed_action (action, clock);
386 }
387
388 static void action_waitall(const char *const *action){
389   CHECK_ACTION_PARAMS(action, 0, 0)
390   double clock = smpi_process()->simulated_elapsed();
391   const unsigned int count_requests = get_reqq_self()->size();
392
393   if (count_requests>0) {
394     MPI_Status status[count_requests];
395
396    int my_proc_id_traced = Actor::self()->getPid();
397    TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
398    int recvs_snd[count_requests];
399    int recvs_rcv[count_requests];
400    for (unsigned int i = 0; i < count_requests; i++) {
401      const auto& req = (*get_reqq_self())[i];
402      if (req && (req->flags () & RECV)){
403        recvs_snd[i]=req->src();
404        recvs_rcv[i]=req->dst();
405      }else
406        recvs_snd[i]=-100;
407    }
408    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
409
410    for (unsigned i = 0; i < count_requests; i++) {
411      if (recvs_snd[i]!=-100)
412        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
413    }
414    TRACE_smpi_comm_out(my_proc_id_traced);
415   }
416   log_timed_action (action, clock);
417 }
418
419 static void action_barrier(const char *const *action){
420   double clock = smpi_process()->simulated_elapsed();
421   int my_proc_id = Actor::self()->getPid();
422   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
423
424   Colls::barrier(MPI_COMM_WORLD);
425
426   TRACE_smpi_comm_out(my_proc_id);
427   log_timed_action (action, clock);
428 }
429
430 static void action_bcast(const char *const *action)
431 {
432   CHECK_ACTION_PARAMS(action, 1, 2)
433   double size = parse_double(action[2]);
434   double clock = smpi_process()->simulated_elapsed();
435   int root=0;
436   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
437   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
438
439   if(action[3]) {
440     root= atoi(action[3]);
441     if(action[4])
442       MPI_CURRENT_TYPE=decode_datatype(action[4]);
443   }
444
445   int my_proc_id = Actor::self()->getPid();
446   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
447                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size, -1,
448                                                     encode_datatype(MPI_CURRENT_TYPE), ""));
449
450   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
451
452   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
453
454   TRACE_smpi_comm_out(my_proc_id);
455   log_timed_action (action, clock);
456 }
457
458 static void action_reduce(const char *const *action)
459 {
460   CHECK_ACTION_PARAMS(action, 2, 2)
461   double comm_size = parse_double(action[2]);
462   double comp_size = parse_double(action[3]);
463   double clock = smpi_process()->simulated_elapsed();
464   int root=0;
465   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466
467   if(action[4]) {
468     root= atoi(action[4]);
469     if(action[5])
470       MPI_CURRENT_TYPE=decode_datatype(action[5]);
471   }
472
473   int my_proc_id = Actor::self()->getPid();
474   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
475                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
476                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
477
478   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
481   smpi_execute_flops(comp_size);
482
483   TRACE_smpi_comm_out(my_proc_id);
484   log_timed_action (action, clock);
485 }
486
487 static void action_allReduce(const char *const *action) {
488   CHECK_ACTION_PARAMS(action, 2, 1)
489   double comm_size = parse_double(action[2]);
490   double comp_size = parse_double(action[3]);
491
492   if(action[4])
493     MPI_CURRENT_TYPE=decode_datatype(action[4]);
494   else
495     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
496
497   double clock = smpi_process()->simulated_elapsed();
498   int my_proc_id = Actor::self()->getPid();
499   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
500                                                                         encode_datatype(MPI_CURRENT_TYPE), ""));
501
502   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
503   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
504   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
505   smpi_execute_flops(comp_size);
506
507   TRACE_smpi_comm_out(my_proc_id);
508   log_timed_action (action, clock);
509 }
510
511 static void action_allToAll(const char *const *action) {
512   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
513   double clock = smpi_process()->simulated_elapsed();
514   int comm_size = MPI_COMM_WORLD->size();
515   int send_size = parse_double(action[2]);
516   int recv_size = parse_double(action[3]);
517   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
518
519   if(action[4] && action[5]) {
520     MPI_CURRENT_TYPE=decode_datatype(action[4]);
521     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
522   }
523   else
524     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
525
526   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
527   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
528
529   int my_proc_id = Actor::self()->getPid();
530   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
531                                                                         encode_datatype(MPI_CURRENT_TYPE),
532                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
533
534   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
535
536   TRACE_smpi_comm_out(my_proc_id);
537   log_timed_action (action, clock);
538 }
539
540 static void action_gather(const char *const *action) {
541   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
542         0 gather 68 68 0 0 0
543       where:
544         1) 68 is the sendcounts
545         2) 68 is the recvcounts
546         3) 0 is the root node
547         4) 0 is the send datatype id, see decode_datatype()
548         5) 0 is the recv datatype id, see decode_datatype()
549   */
550   CHECK_ACTION_PARAMS(action, 2, 3)
551   double clock = smpi_process()->simulated_elapsed();
552   int comm_size = MPI_COMM_WORLD->size();
553   int send_size = parse_double(action[2]);
554   int recv_size = parse_double(action[3]);
555   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
556   if(action[4] && action[5]) {
557     MPI_CURRENT_TYPE=decode_datatype(action[5]);
558     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
559   } else {
560     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
561   }
562   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
563   void *recv = nullptr;
564   int root=0;
565   if(action[4])
566     root=atoi(action[4]);
567   int rank = MPI_COMM_WORLD->rank();
568
569   if(rank==root)
570     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
571
572   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573                                                                         encode_datatype(MPI_CURRENT_TYPE),
574                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
575
576   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action (action, clock);
580 }
581
582 static void action_scatter(const char* const* action)
583 {
584   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
585         0 gather 68 68 0 0 0
586       where:
587         1) 68 is the sendcounts
588         2) 68 is the recvcounts
589         3) 0 is the root node
590         4) 0 is the send datatype id, see decode_datatype()
591         5) 0 is the recv datatype id, see decode_datatype()
592   */
593   CHECK_ACTION_PARAMS(action, 2, 3)
594   double clock                   = smpi_process()->simulated_elapsed();
595   int comm_size                  = MPI_COMM_WORLD->size();
596   int send_size                  = parse_double(action[2]);
597   int recv_size                  = parse_double(action[3]);
598   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
599   if (action[4] && action[5]) {
600     MPI_CURRENT_TYPE  = decode_datatype(action[5]);
601     MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
602   } else {
603     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
604   }
605   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
606   void* recv = nullptr;
607   int root   = 0;
608   if (action[4])
609     root   = atoi(action[4]);
610   int rank = MPI_COMM_WORLD->rank();
611
612   if (rank == root)
613     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
614
615   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
616                                                                         encode_datatype(MPI_CURRENT_TYPE),
617                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
618
619   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
620
621   TRACE_smpi_comm_out(Actor::self()->getPid());
622   log_timed_action(action, clock);
623 }
624
625 static void action_gatherv(const char *const *action) {
626   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
627        0 gather 68 68 10 10 10 0 0 0
628      where:
629        1) 68 is the sendcount
630        2) 68 10 10 10 is the recvcounts
631        3) 0 is the root node
632        4) 0 is the send datatype id, see decode_datatype()
633        5) 0 is the recv datatype id, see decode_datatype()
634   */
635   double clock = smpi_process()->simulated_elapsed();
636   int comm_size = MPI_COMM_WORLD->size();
637   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
638   int send_size = parse_double(action[2]);
639   int disps[comm_size];
640   int recvcounts[comm_size];
641   int recv_sum=0;
642
643   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
644   if(action[4+comm_size] && action[5+comm_size]) {
645     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
646     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
647   } else
648     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
649
650   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
651   void *recv = nullptr;
652   for(int i=0;i<comm_size;i++) {
653     recvcounts[i] = atoi(action[i+3]);
654     recv_sum=recv_sum+recvcounts[i];
655     disps[i]=0;
656   }
657
658   int root=atoi(action[3+comm_size]);
659   int rank = MPI_COMM_WORLD->rank();
660
661   if(rank==root)
662     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
663
664   std::vector<int>* trace_recvcounts = new std::vector<int>;
665   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
666     trace_recvcounts->push_back(recvcounts[i]);
667
668   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
669                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
670                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
671
672   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
673
674   TRACE_smpi_comm_out(Actor::self()->getPid());
675   log_timed_action (action, clock);
676 }
677
678 static void action_scatterv(const char* const* action)
679 {
680   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
681        0 gather 68 10 10 10 68 0 0 0
682      where:
683        1) 68 10 10 10 is the sendcounts
684        2) 68 is the recvcount
685        3) 0 is the root node
686        4) 0 is the send datatype id, see decode_datatype()
687        5) 0 is the recv datatype id, see decode_datatype()
688   */
689   double clock  = smpi_process()->simulated_elapsed();
690   int comm_size = MPI_COMM_WORLD->size();
691   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
692   int recv_size = parse_double(action[2 + comm_size]);
693   int disps[comm_size];
694   int sendcounts[comm_size];
695   int send_sum = 0;
696
697   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
698   if (action[4 + comm_size] && action[5 + comm_size]) {
699     MPI_CURRENT_TYPE  = decode_datatype(action[4 + comm_size]);
700     MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
701   } else
702     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
703
704   void* send = nullptr;
705   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
706   for (int i = 0; i < comm_size; i++) {
707     sendcounts[i] = atoi(action[i + 2]);
708     send_sum += sendcounts[i];
709     disps[i] = 0;
710   }
711
712   int root = atoi(action[3 + comm_size]);
713   int rank = MPI_COMM_WORLD->rank();
714
715   if (rank == root)
716     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
717
718   std::vector<int>* trace_sendcounts = new std::vector<int>;
719   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
720     trace_sendcounts->push_back(sendcounts[i]);
721
722   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
723                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
724                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
725
726   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
727
728   TRACE_smpi_comm_out(Actor::self()->getPid());
729   log_timed_action(action, clock);
730 }
731
732 static void action_reducescatter(const char *const *action) {
733  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
734       0 reduceScatter 275427 275427 275427 204020 11346849 0
735     where:
736       1) The first four values after the name of the action declare the recvcounts array
737       2) The value 11346849 is the amount of instructions
738       3) The last value corresponds to the datatype, see decode_datatype().
739 */
740   double clock = smpi_process()->simulated_elapsed();
741   int comm_size = MPI_COMM_WORLD->size();
742   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
743   int comp_size = parse_double(action[2+comm_size]);
744   int recvcounts[comm_size];
745   int my_proc_id = Actor::self()->getPid();
746   int size = 0;
747   std::vector<int>* trace_recvcounts = new std::vector<int>;
748   if(action[3+comm_size])
749     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
750   else
751     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
752
753   for(int i=0;i<comm_size;i++) {
754     recvcounts[i] = atoi(action[i+2]);
755     trace_recvcounts->push_back(recvcounts[i]);
756     size+=recvcounts[i];
757   }
758
759   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
760                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
761                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
762                                                        encode_datatype(MPI_CURRENT_TYPE)));
763
764   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
765   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
766
767   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
768   smpi_execute_flops(comp_size);
769
770   TRACE_smpi_comm_out(my_proc_id);
771   log_timed_action (action, clock);
772 }
773
774 static void action_allgather(const char *const *action) {
775   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
776         0 allGather 275427 275427
777     where:
778         1) 275427 is the sendcount
779         2) 275427 is the recvcount
780         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
781   */
782   double clock = smpi_process()->simulated_elapsed();
783
784   CHECK_ACTION_PARAMS(action, 2, 2)
785   int sendcount=atoi(action[2]);
786   int recvcount=atoi(action[3]);
787
788   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
789
790   if(action[4] && action[5]) {
791     MPI_CURRENT_TYPE = decode_datatype(action[4]);
792     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
793   } else
794     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
795
796   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
797   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
798
799   int my_proc_id = Actor::self()->getPid();
800
801   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
802                                                                         encode_datatype(MPI_CURRENT_TYPE),
803                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
804
805   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
806
807   TRACE_smpi_comm_out(my_proc_id);
808   log_timed_action (action, clock);
809 }
810
811 static void action_allgatherv(const char *const *action) {
812   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
813         0 allGatherV 275427 275427 275427 275427 204020
814      where:
815         1) 275427 is the sendcount
816         2) The next four elements declare the recvcounts array
817         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
818   */
819   double clock = smpi_process()->simulated_elapsed();
820
821   int comm_size = MPI_COMM_WORLD->size();
822   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
823   int sendcount=atoi(action[2]);
824   int recvcounts[comm_size];
825   int disps[comm_size];
826   int recv_sum=0;
827   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
828
829   if(action[3+comm_size] && action[4+comm_size]) {
830     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
831     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
832   } else
833     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
834
835   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
836
837   for(int i=0;i<comm_size;i++) {
838     recvcounts[i] = atoi(action[i+3]);
839     recv_sum=recv_sum+recvcounts[i];
840     disps[i] = 0;
841   }
842   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
843
844   int my_proc_id = Actor::self()->getPid();
845
846   std::vector<int>* trace_recvcounts = new std::vector<int>;
847   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
848     trace_recvcounts->push_back(recvcounts[i]);
849
850   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData(
851                                              "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
852                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
853
854   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
855                           MPI_COMM_WORLD);
856
857   TRACE_smpi_comm_out(my_proc_id);
858   log_timed_action (action, clock);
859 }
860
861 static void action_allToAllv(const char *const *action) {
862   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
863         0 allToAllV 100 1 7 10 12 100 1 70 10 5
864      where:
865         1) 100 is the size of the send buffer *sizeof(int),
866         2) 1 7 10 12 is the sendcounts array
867         3) 100*sizeof(int) is the size of the receiver buffer
868         4)  1 70 10 5 is the recvcounts array
869   */
870   double clock = smpi_process()->simulated_elapsed();
871
872   int comm_size = MPI_COMM_WORLD->size();
873   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
874   int send_size = 0;
875   int recv_size = 0;
876   int sendcounts[comm_size];
877   std::vector<int>* trace_sendcounts = new std::vector<int>;
878   int recvcounts[comm_size];
879   std::vector<int>* trace_recvcounts = new std::vector<int>;
880   int senddisps[comm_size];
881   int recvdisps[comm_size];
882
883   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
884
885   int send_buf_size=parse_double(action[2]);
886   int recv_buf_size=parse_double(action[3+comm_size]);
887   if(action[4+2*comm_size] && action[5+2*comm_size]) {
888     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
889     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
890   }
891   else
892     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
893
894   int my_proc_id       = Actor::self()->getPid();
895   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
896   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
897
898   for(int i=0;i<comm_size;i++) {
899     sendcounts[i] = atoi(action[i+3]);
900     trace_sendcounts->push_back(sendcounts[i]);
901     send_size += sendcounts[i];
902     recvcounts[i] = atoi(action[i+4+comm_size]);
903     trace_recvcounts->push_back(recvcounts[i]);
904     recv_size += recvcounts[i];
905     senddisps[i] = 0;
906     recvdisps[i] = 0;
907   }
908
909   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::VarCollTIData(
910                                              "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
911                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
912
913   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
914                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
915
916   TRACE_smpi_comm_out(my_proc_id);
917   log_timed_action (action, clock);
918 }
919
920 }} // namespace simgrid::smpi
921
922 /** @brief Only initialize the replay, don't do it for real */
923 void smpi_replay_init(int* argc, char*** argv)
924 {
925   simgrid::smpi::Process::init(argc, argv);
926   smpi_process()->mark_as_initialized();
927   smpi_process()->set_replaying(true);
928
929   int my_proc_id = Actor::self()->getPid();
930   TRACE_smpi_init(my_proc_id);
931   TRACE_smpi_computing_init(my_proc_id);
932   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
933   TRACE_smpi_comm_out(my_proc_id);
934   xbt_replay_action_register("init",       simgrid::smpi::action_init);
935   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
936   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
937   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
938   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
939   xbt_replay_action_register("send",       simgrid::smpi::action_send);
940   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
941   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
942   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
943   xbt_replay_action_register("test",       simgrid::smpi::action_test);
944   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
945   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
946   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
947   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
948   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
949   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
950   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
951   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
952   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
953   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
954   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
955   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
956   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
957   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
958   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
959   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
960
961   //if we have a delayed start, sleep here.
962   if(*argc>2){
963     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
964     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
965     smpi_execute_flops(value);
966   } else {
967     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
968     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
969     smpi_execute_flops(0.0);
970   }
971 }
972
973 /** @brief actually run the replay after initialization */
974 void smpi_replay_main(int* argc, char*** argv)
975 {
976   simgrid::xbt::replay_runner(*argc, *argv);
977
978   /* and now, finalize everything */
979   /* One active process will stop. Decrease the counter*/
980   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
981   if (not get_reqq_self()->empty()) {
982     unsigned int count_requests=get_reqq_self()->size();
983     MPI_Request requests[count_requests];
984     MPI_Status status[count_requests];
985     unsigned int i=0;
986
987     for (auto const& req : *get_reqq_self()) {
988       requests[i] = req;
989       i++;
990     }
991     simgrid::smpi::Request::waitall(count_requests, requests, status);
992   }
993   delete get_reqq_self();
994   active_processes--;
995
996   if(active_processes==0){
997     /* Last process alive speaking: end the simulated timer */
998     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
999     xbt_free(sendbuffer);
1000     xbt_free(recvbuffer);
1001   }
1002
1003   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1004
1005   smpi_process()->finalize();
1006
1007   TRACE_smpi_comm_out(Actor::self()->getPid());
1008   TRACE_smpi_finalize(Actor::self()->getPid());
1009 }
1010
1011 /** @brief chain a replay initialization and a replay start */
1012 void smpi_replay_run(int* argc, char*** argv)
1013 {
1014   smpi_replay_init(argc, argv);
1015   smpi_replay_main(argc, argv);
1016 }