Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
17a6dfdd344a26a7b112ae579ebd63ef26cec851
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 static void log_timed_action (const char *const *action, double clock){
36   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37     char *name = xbt_str_join_array(action, " ");
38     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
39     xbt_free(name);
40   }
41 }
42
43 static std::vector<MPI_Request>* get_reqq_self()
44 {
45   return reqq.at(Actor::self()->getPid());
46 }
47
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 {
50    reqq.insert({Actor::self()->getPid(), mpi_request});
51 }
52
53 //allocate a single buffer for all sends, growing it if needed
54 void* smpi_get_tmp_sendbuffer(int size)
55 {
56   if (not smpi_process()->replaying())
57     return xbt_malloc(size);
58   if (sendbuffer_size<size){
59     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60     sendbuffer_size=size;
61   }
62   return sendbuffer;
63 }
64
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67   if (not smpi_process()->replaying())
68     return xbt_malloc(size);
69   if (recvbuffer_size<size){
70     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71     recvbuffer_size=size;
72   }
73   return recvbuffer;
74 }
75
76 void smpi_free_tmp_buffer(void* buf){
77   if (not smpi_process()->replaying())
78     xbt_free(buf);
79 }
80
81 /* Helper function */
82 static double parse_double(const char *string)
83 {
84   char *endptr;
85   double value = strtod(string, &endptr);
86   if (*endptr != '\0')
87     THROWF(unknown_error, 0, "%s is not a double", string);
88   return value;
89 }
90
91
92 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
93 static MPI_Datatype decode_datatype(const char *const action)
94 {
95   switch(atoi(action)) {
96     case 0:
97       return MPI_DOUBLE;
98       break;
99     case 1:
100       return MPI_INT;
101       break;
102     case 2:
103       return MPI_CHAR;
104       break;
105     case 3:
106       return MPI_SHORT;
107       break;
108     case 4:
109       return MPI_LONG;
110       break;
111     case 5:
112       return MPI_FLOAT;
113       break;
114     case 6:
115       return MPI_BYTE;
116       break;
117     default:
118       return MPI_DEFAULT_TYPE;
119       break;
120   }
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int my_proc_id = Actor::self()->getPid();
202
203   TRACE_smpi_computing_in(my_proc_id, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(my_proc_id);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218
219   int my_proc_id = Actor::self()->getPid();
220   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221
222   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224   if (not TRACE_smpi_view_internals())
225     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226
227   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228
229   TRACE_smpi_comm_out(my_proc_id);
230
231   log_timed_action(action, clock);
232 }
233
234 static void action_Isend(const char *const *action)
235 {
236   CHECK_ACTION_PARAMS(action, 2, 1)
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process()->simulated_elapsed();
240
241   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242
243   int my_proc_id = Actor::self()->getPid();
244   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247   if (not TRACE_smpi_view_internals())
248     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249
250   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251
252   TRACE_smpi_comm_out(my_proc_id);
253
254   get_reqq_self()->push_back(request);
255
256   log_timed_action (action, clock);
257 }
258
259 static void action_recv(const char *const *action) {
260   CHECK_ACTION_PARAMS(action, 2, 1)
261   int from = atoi(action[2]);
262   double size=parse_double(action[3]);
263   double clock = smpi_process()->simulated_elapsed();
264   MPI_Status status;
265
266   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267
268   int my_proc_id = Actor::self()->getPid();
269   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270
271   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273
274   //unknown size from the receiver point of view
275   if (size <= 0.0) {
276     Request::probe(from, 0, MPI_COMM_WORLD, &status);
277     size=status.count;
278   }
279
280   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281
282   TRACE_smpi_comm_out(my_proc_id);
283   if (not TRACE_smpi_view_internals()) {
284     TRACE_smpi_recv(src_traced, my_proc_id, 0);
285   }
286
287   log_timed_action (action, clock);
288 }
289
290 static void action_Irecv(const char *const *action)
291 {
292   CHECK_ACTION_PARAMS(action, 2, 1)
293   int from = atoi(action[2]);
294   double size=parse_double(action[3]);
295   double clock = smpi_process()->simulated_elapsed();
296
297   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298
299   int my_proc_id = Actor::self()->getPid();
300   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302   MPI_Status status;
303   //unknow size from the receiver pov
304   if (size <= 0.0) {
305     Request::probe(from, 0, MPI_COMM_WORLD, &status);
306     size = status.count;
307   }
308
309   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310
311   TRACE_smpi_comm_out(my_proc_id);
312   get_reqq_self()->push_back(request);
313
314   log_timed_action (action, clock);
315 }
316
317 static void action_test(const char* const* action)
318 {
319   CHECK_ACTION_PARAMS(action, 0, 0)
320   double clock = smpi_process()->simulated_elapsed();
321   MPI_Status status;
322
323   MPI_Request request = get_reqq_self()->back();
324   get_reqq_self()->pop_back();
325   //if request is null here, this may mean that a previous test has succeeded
326   //Different times in traced application and replayed version may lead to this
327   //In this case, ignore the extra calls.
328   if(request!=nullptr){
329     int my_proc_id = Actor::self()->getPid();
330     TRACE_smpi_testing_in(my_proc_id);
331
332     int flag = Request::test(&request, &status);
333
334     XBT_DEBUG("MPI_Test result: %d", flag);
335     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336     get_reqq_self()->push_back(request);
337
338     TRACE_smpi_testing_out(my_proc_id);
339   }
340   log_timed_action (action, clock);
341 }
342
343 static void action_wait(const char *const *action){
344   CHECK_ACTION_PARAMS(action, 0, 0)
345   double clock = smpi_process()->simulated_elapsed();
346   MPI_Status status;
347
348   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349       xbt_str_join_array(action," "));
350   MPI_Request request = get_reqq_self()->back();
351   get_reqq_self()->pop_back();
352
353   if (request==nullptr){
354     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
355     return;
356   }
357
358   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359
360   MPI_Group group = request->comm()->group();
361   int src_traced = group->rank(request->src());
362   int dst_traced = group->rank(request->dst());
363   int is_wait_for_receive = (request->flags() & RECV);
364   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365
366   Request::wait(&request, &status);
367
368   TRACE_smpi_comm_out(rank);
369   if (is_wait_for_receive)
370     TRACE_smpi_recv(src_traced, dst_traced, 0);
371   log_timed_action (action, clock);
372 }
373
374 static void action_waitall(const char *const *action){
375   CHECK_ACTION_PARAMS(action, 0, 0)
376   double clock = smpi_process()->simulated_elapsed();
377   const unsigned int count_requests = get_reqq_self()->size();
378
379   if (count_requests>0) {
380     MPI_Status status[count_requests];
381
382     int my_proc_id_traced = Actor::self()->getPid();
383     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385     int recvs_snd[count_requests];
386     int recvs_rcv[count_requests];
387     for (unsigned int i = 0; i < count_requests; i++) {
388       const auto& req = (*get_reqq_self())[i];
389       if (req && (req->flags() & RECV)) {
390         recvs_snd[i] = req->src();
391         recvs_rcv[i] = req->dst();
392       } else
393         recvs_snd[i] = -100;
394    }
395    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396
397    for (unsigned i = 0; i < count_requests; i++) {
398      if (recvs_snd[i]!=-100)
399        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400    }
401    TRACE_smpi_comm_out(my_proc_id_traced);
402   }
403   log_timed_action (action, clock);
404 }
405
406 static void action_barrier(const char *const *action){
407   double clock = smpi_process()->simulated_elapsed();
408   int my_proc_id = Actor::self()->getPid();
409   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410
411   Colls::barrier(MPI_COMM_WORLD);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_bcast(const char *const *action)
418 {
419   CHECK_ACTION_PARAMS(action, 1, 2)
420   double size = parse_double(action[2]);
421   double clock = smpi_process()->simulated_elapsed();
422   int root     = (action[3]) ? atoi(action[3]) : 0;
423   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425
426   int my_proc_id = Actor::self()->getPid();
427   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
429                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
430
431   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
432
433   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
434
435   TRACE_smpi_comm_out(my_proc_id);
436   log_timed_action (action, clock);
437 }
438
439 static void action_reduce(const char *const *action)
440 {
441   CHECK_ACTION_PARAMS(action, 2, 2)
442   double comm_size = parse_double(action[2]);
443   double comp_size = parse_double(action[3]);
444   double clock = smpi_process()->simulated_elapsed();
445   int root         = (action[4]) ? atoi(action[4]) : 0;
446
447   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
448
449   int my_proc_id = Actor::self()->getPid();
450   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
451                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
452                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
453
454   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
457   smpi_execute_flops(comp_size);
458
459   TRACE_smpi_comm_out(my_proc_id);
460   log_timed_action (action, clock);
461 }
462
463 static void action_allReduce(const char *const *action) {
464   CHECK_ACTION_PARAMS(action, 2, 1)
465   double comm_size = parse_double(action[2]);
466   double comp_size = parse_double(action[3]);
467
468   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
469
470   double clock = smpi_process()->simulated_elapsed();
471   int my_proc_id = Actor::self()->getPid();
472   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
473                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
474
475   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
478   smpi_execute_flops(comp_size);
479
480   TRACE_smpi_comm_out(my_proc_id);
481   log_timed_action (action, clock);
482 }
483
484 static void action_allToAll(const char *const *action) {
485   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
486   double clock = smpi_process()->simulated_elapsed();
487   int comm_size = MPI_COMM_WORLD->size();
488   int send_size = parse_double(action[2]);
489   int recv_size = parse_double(action[3]);
490   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
491   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492
493   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
494   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
495
496   int my_proc_id = Actor::self()->getPid();
497   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
498                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
499                                                     encode_datatype(MPI_CURRENT_TYPE),
500                                                     encode_datatype(MPI_CURRENT_TYPE2)));
501
502   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
503
504   TRACE_smpi_comm_out(my_proc_id);
505   log_timed_action (action, clock);
506 }
507
508 static void action_gather(const char *const *action) {
509   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
510         0 gather 68 68 0 0 0
511       where:
512         1) 68 is the sendcounts
513         2) 68 is the recvcounts
514         3) 0 is the root node
515         4) 0 is the send datatype id, see decode_datatype()
516         5) 0 is the recv datatype id, see decode_datatype()
517   */
518   CHECK_ACTION_PARAMS(action, 2, 3)
519   double clock = smpi_process()->simulated_elapsed();
520   int comm_size = MPI_COMM_WORLD->size();
521   int send_size = parse_double(action[2]);
522   int recv_size = parse_double(action[3]);
523   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
524   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
525
526   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
527   void *recv = nullptr;
528   int root   = (action[4]) ? atoi(action[4]) : 0;
529   int rank = MPI_COMM_WORLD->rank();
530
531   if(rank==root)
532     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
533
534   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
535                                                                         encode_datatype(MPI_CURRENT_TYPE),
536                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
537
538   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
539
540   TRACE_smpi_comm_out(Actor::self()->getPid());
541   log_timed_action (action, clock);
542 }
543
544 static void action_scatter(const char* const* action)
545 {
546   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
547         0 gather 68 68 0 0 0
548       where:
549         1) 68 is the sendcounts
550         2) 68 is the recvcounts
551         3) 0 is the root node
552         4) 0 is the send datatype id, see decode_datatype()
553         5) 0 is the recv datatype id, see decode_datatype()
554   */
555   CHECK_ACTION_PARAMS(action, 2, 3)
556   double clock                   = smpi_process()->simulated_elapsed();
557   int comm_size                  = MPI_COMM_WORLD->size();
558   int send_size                  = parse_double(action[2]);
559   int recv_size                  = parse_double(action[3]);
560   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
561   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
562
563   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
564   void* recv = nullptr;
565   int root   = (action[4]) ? atoi(action[4]) : 0;
566   int rank = MPI_COMM_WORLD->rank();
567
568   if (rank == root)
569     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572                                                                         encode_datatype(MPI_CURRENT_TYPE),
573                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
574
575   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576
577   TRACE_smpi_comm_out(Actor::self()->getPid());
578   log_timed_action(action, clock);
579 }
580
581 static void action_gatherv(const char *const *action) {
582   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
583        0 gather 68 68 10 10 10 0 0 0
584      where:
585        1) 68 is the sendcount
586        2) 68 10 10 10 is the recvcounts
587        3) 0 is the root node
588        4) 0 is the send datatype id, see decode_datatype()
589        5) 0 is the recv datatype id, see decode_datatype()
590   */
591   double clock = smpi_process()->simulated_elapsed();
592   int comm_size = MPI_COMM_WORLD->size();
593   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
594   int send_size = parse_double(action[2]);
595   std::vector<int> disps(comm_size, 0);
596   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
597
598   MPI_Datatype MPI_CURRENT_TYPE =
599       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
600   MPI_Datatype MPI_CURRENT_TYPE2{
601       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
602
603   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
604   void *recv = nullptr;
605   for(int i=0;i<comm_size;i++) {
606     (*recvcounts)[i] = atoi(action[i + 3]);
607   }
608   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
609
610   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
611   int rank = MPI_COMM_WORLD->rank();
612
613   if(rank==root)
614     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
615
616   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
617                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
618                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
619
620   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
621                  MPI_COMM_WORLD);
622
623   TRACE_smpi_comm_out(Actor::self()->getPid());
624   log_timed_action (action, clock);
625 }
626
627 static void action_scatterv(const char* const* action)
628 {
629   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
630        0 gather 68 10 10 10 68 0 0 0
631      where:
632        1) 68 10 10 10 is the sendcounts
633        2) 68 is the recvcount
634        3) 0 is the root node
635        4) 0 is the send datatype id, see decode_datatype()
636        5) 0 is the recv datatype id, see decode_datatype()
637   */
638   double clock  = smpi_process()->simulated_elapsed();
639   int comm_size = MPI_COMM_WORLD->size();
640   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
641   int recv_size = parse_double(action[2 + comm_size]);
642   std::vector<int> disps(comm_size, 0);
643   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
644
645   MPI_Datatype MPI_CURRENT_TYPE =
646       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
647   MPI_Datatype MPI_CURRENT_TYPE2{
648       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
649
650   void* send = nullptr;
651   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
652   for (int i = 0; i < comm_size; i++) {
653     (*sendcounts)[i] = atoi(action[i + 2]);
654   }
655   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
656
657   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
658   int rank = MPI_COMM_WORLD->rank();
659
660   if (rank == root)
661     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
662
663   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
664                                                                            nullptr, encode_datatype(MPI_CURRENT_TYPE),
665                                                                            encode_datatype(MPI_CURRENT_TYPE2)));
666
667   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
668                   MPI_COMM_WORLD);
669
670   TRACE_smpi_comm_out(Actor::self()->getPid());
671   log_timed_action(action, clock);
672 }
673
674 static void action_reducescatter(const char *const *action) {
675  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
676       0 reduceScatter 275427 275427 275427 204020 11346849 0
677     where:
678       1) The first four values after the name of the action declare the recvcounts array
679       2) The value 11346849 is the amount of instructions
680       3) The last value corresponds to the datatype, see decode_datatype().
681 */
682   double clock = smpi_process()->simulated_elapsed();
683   int comm_size = MPI_COMM_WORLD->size();
684   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
685   int comp_size = parse_double(action[2+comm_size]);
686   int my_proc_id                     = Actor::self()->getPid();
687   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
688   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
689
690   for(int i=0;i<comm_size;i++) {
691     recvcounts->push_back(atoi(action[i + 2]));
692   }
693   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
694
695   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
696                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
697                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
698                                                        encode_datatype(MPI_CURRENT_TYPE)));
699
700   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
701   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
702
703   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
704   smpi_execute_flops(comp_size);
705
706   TRACE_smpi_comm_out(my_proc_id);
707   log_timed_action (action, clock);
708 }
709
710 static void action_allgather(const char *const *action) {
711   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
712         0 allGather 275427 275427
713     where:
714         1) 275427 is the sendcount
715         2) 275427 is the recvcount
716         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
717   */
718   double clock = smpi_process()->simulated_elapsed();
719
720   CHECK_ACTION_PARAMS(action, 2, 2)
721   int sendcount=atoi(action[2]);
722   int recvcount=atoi(action[3]);
723
724   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
725   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
726
727   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
728   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
729
730   int my_proc_id = Actor::self()->getPid();
731
732   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
733                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
734                                                     encode_datatype(MPI_CURRENT_TYPE),
735                                                     encode_datatype(MPI_CURRENT_TYPE2)));
736
737   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
738
739   TRACE_smpi_comm_out(my_proc_id);
740   log_timed_action (action, clock);
741 }
742
743 static void action_allgatherv(const char *const *action) {
744   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
745         0 allGatherV 275427 275427 275427 275427 204020
746      where:
747         1) 275427 is the sendcount
748         2) The next four elements declare the recvcounts array
749         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
750   */
751   double clock = smpi_process()->simulated_elapsed();
752
753   int comm_size = MPI_COMM_WORLD->size();
754   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
755   int sendcount=atoi(action[2]);
756   int recvcounts[comm_size];
757   std::vector<int> disps(comm_size, 0);
758   int recv_sum=0;
759
760   MPI_Datatype MPI_CURRENT_TYPE =
761       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
762   MPI_Datatype MPI_CURRENT_TYPE2{
763       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
764
765   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
766
767   for(int i=0;i<comm_size;i++) {
768     recvcounts[i] = atoi(action[i+3]);
769     recv_sum=recv_sum+recvcounts[i];
770   }
771   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
772
773   int my_proc_id = Actor::self()->getPid();
774
775   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
776
777   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
778                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
779                                                        encode_datatype(MPI_CURRENT_TYPE),
780                                                        encode_datatype(MPI_CURRENT_TYPE2)));
781
782   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
783                           MPI_COMM_WORLD);
784
785   TRACE_smpi_comm_out(my_proc_id);
786   log_timed_action (action, clock);
787 }
788
789 static void action_allToAllv(const char *const *action) {
790   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
791         0 allToAllV 100 1 7 10 12 100 1 70 10 5
792      where:
793         1) 100 is the size of the send buffer *sizeof(int),
794         2) 1 7 10 12 is the sendcounts array
795         3) 100*sizeof(int) is the size of the receiver buffer
796         4)  1 70 10 5 is the recvcounts array
797   */
798   double clock = smpi_process()->simulated_elapsed();
799
800   int comm_size = MPI_COMM_WORLD->size();
801   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
802   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
803   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
804   std::vector<int> senddisps(comm_size, 0);
805   std::vector<int> recvdisps(comm_size, 0);
806
807   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
808                                       ? decode_datatype(action[4 + 2 * comm_size])
809                                       : MPI_DEFAULT_TYPE;
810   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
811                                      ? decode_datatype(action[5 + 2 * comm_size])
812                                      : MPI_DEFAULT_TYPE};
813
814   int send_buf_size=parse_double(action[2]);
815   int recv_buf_size=parse_double(action[3+comm_size]);
816   int my_proc_id = Actor::self()->getPid();
817   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
818   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
819
820   for(int i=0;i<comm_size;i++) {
821     (*sendcounts)[i] = atoi(action[3 + i]);
822     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
823   }
824   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
825   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
826
827   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
828                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
829                                                        encode_datatype(MPI_CURRENT_TYPE),
830                                                        encode_datatype(MPI_CURRENT_TYPE2)));
831
832   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
833                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
834
835   TRACE_smpi_comm_out(my_proc_id);
836   log_timed_action (action, clock);
837 }
838
839 }} // namespace simgrid::smpi
840
841 /** @brief Only initialize the replay, don't do it for real */
842 void smpi_replay_init(int* argc, char*** argv)
843 {
844   simgrid::smpi::Process::init(argc, argv);
845   smpi_process()->mark_as_initialized();
846   smpi_process()->set_replaying(true);
847
848   int my_proc_id = Actor::self()->getPid();
849   TRACE_smpi_init(my_proc_id);
850   TRACE_smpi_computing_init(my_proc_id);
851   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
852   TRACE_smpi_comm_out(my_proc_id);
853   xbt_replay_action_register("init",       simgrid::smpi::action_init);
854   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
855   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
856   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
857   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
858   xbt_replay_action_register("send",       simgrid::smpi::action_send);
859   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
860   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
861   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
862   xbt_replay_action_register("test",       simgrid::smpi::action_test);
863   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
864   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
865   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
866   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
867   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
868   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
869   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
870   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
871   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
872   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
873   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
874   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
875   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
876   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
877   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
878   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
879
880   //if we have a delayed start, sleep here.
881   if(*argc>2){
882     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
883     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
884     smpi_execute_flops(value);
885   } else {
886     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
887     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
888     smpi_execute_flops(0.0);
889   }
890 }
891
892 /** @brief actually run the replay after initialization */
893 void smpi_replay_main(int* argc, char*** argv)
894 {
895   simgrid::xbt::replay_runner(*argc, *argv);
896
897   /* and now, finalize everything */
898   /* One active process will stop. Decrease the counter*/
899   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
900   if (not get_reqq_self()->empty()) {
901     unsigned int count_requests=get_reqq_self()->size();
902     MPI_Request requests[count_requests];
903     MPI_Status status[count_requests];
904     unsigned int i=0;
905
906     for (auto const& req : *get_reqq_self()) {
907       requests[i] = req;
908       i++;
909     }
910     simgrid::smpi::Request::waitall(count_requests, requests, status);
911   }
912   delete get_reqq_self();
913   active_processes--;
914
915   if(active_processes==0){
916     /* Last process alive speaking: end the simulated timer */
917     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
918     xbt_free(sendbuffer);
919     xbt_free(recvbuffer);
920   }
921
922   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
923
924   smpi_process()->finalize();
925
926   TRACE_smpi_comm_out(Actor::self()->getPid());
927   TRACE_smpi_finalize(Actor::self()->getPid());
928 }
929
930 /** @brief chain a replay initialization and a replay start */
931 void smpi_replay_run(int* argc, char*** argv)
932 {
933   smpi_replay_init(argc, argv);
934   smpi_replay_main(argc, argv);
935 }