Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Move disps arrays to vectors
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <numeric>
16 #include <unordered_map>
17 #include <vector>
18
19 using simgrid::s4u::Actor;
20
21 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22
23 static int communicator_size = 0;
24 static int active_processes  = 0;
25 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26
27 static MPI_Datatype MPI_DEFAULT_TYPE;
28 static MPI_Datatype MPI_CURRENT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 static void log_timed_action (const char *const *action, double clock){
36   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37     char *name = xbt_str_join_array(action, " ");
38     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
39     xbt_free(name);
40   }
41 }
42
43 static std::vector<MPI_Request>* get_reqq_self()
44 {
45   return reqq.at(Actor::self()->getPid());
46 }
47
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 {
50    reqq.insert({Actor::self()->getPid(), mpi_request});
51 }
52
53 //allocate a single buffer for all sends, growing it if needed
54 void* smpi_get_tmp_sendbuffer(int size)
55 {
56   if (not smpi_process()->replaying())
57     return xbt_malloc(size);
58   if (sendbuffer_size<size){
59     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60     sendbuffer_size=size;
61   }
62   return sendbuffer;
63 }
64
65 //allocate a single buffer for all recv
66 void* smpi_get_tmp_recvbuffer(int size){
67   if (not smpi_process()->replaying())
68     return xbt_malloc(size);
69   if (recvbuffer_size<size){
70     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71     recvbuffer_size=size;
72   }
73   return recvbuffer;
74 }
75
76 void smpi_free_tmp_buffer(void* buf){
77   if (not smpi_process()->replaying())
78     xbt_free(buf);
79 }
80
81 /* Helper function */
82 static double parse_double(const char *string)
83 {
84   char *endptr;
85   double value = strtod(string, &endptr);
86   if (*endptr != '\0')
87     THROWF(unknown_error, 0, "%s is not a double", string);
88   return value;
89 }
90
91
92 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
93 static MPI_Datatype decode_datatype(const char *const action)
94 {
95   switch(atoi(action)) {
96     case 0:
97       return MPI_DOUBLE;
98       break;
99     case 1:
100       return MPI_INT;
101       break;
102     case 2:
103       return MPI_CHAR;
104       break;
105     case 3:
106       return MPI_SHORT;
107       break;
108     case 4:
109       return MPI_LONG;
110       break;
111     case 5:
112       return MPI_FLOAT;
113       break;
114     case 6:
115       return MPI_BYTE;
116       break;
117     default:
118       return MPI_DEFAULT_TYPE;
119       break;
120   }
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int my_proc_id = Actor::self()->getPid();
202
203   TRACE_smpi_computing_in(my_proc_id, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(my_proc_id);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218
219   int my_proc_id = Actor::self()->getPid();
220   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221
222   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224   if (not TRACE_smpi_view_internals())
225     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226
227   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228
229   TRACE_smpi_comm_out(my_proc_id);
230
231   log_timed_action(action, clock);
232 }
233
234 static void action_Isend(const char *const *action)
235 {
236   CHECK_ACTION_PARAMS(action, 2, 1)
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process()->simulated_elapsed();
240
241   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242
243   int my_proc_id = Actor::self()->getPid();
244   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247   if (not TRACE_smpi_view_internals())
248     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249
250   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251
252   TRACE_smpi_comm_out(my_proc_id);
253
254   get_reqq_self()->push_back(request);
255
256   log_timed_action (action, clock);
257 }
258
259 static void action_recv(const char *const *action) {
260   CHECK_ACTION_PARAMS(action, 2, 1)
261   int from = atoi(action[2]);
262   double size=parse_double(action[3]);
263   double clock = smpi_process()->simulated_elapsed();
264   MPI_Status status;
265
266   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267
268   int my_proc_id = Actor::self()->getPid();
269   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270
271   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273
274   //unknown size from the receiver point of view
275   if (size <= 0.0) {
276     Request::probe(from, 0, MPI_COMM_WORLD, &status);
277     size=status.count;
278   }
279
280   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281
282   TRACE_smpi_comm_out(my_proc_id);
283   if (not TRACE_smpi_view_internals()) {
284     TRACE_smpi_recv(src_traced, my_proc_id, 0);
285   }
286
287   log_timed_action (action, clock);
288 }
289
290 static void action_Irecv(const char *const *action)
291 {
292   CHECK_ACTION_PARAMS(action, 2, 1)
293   int from = atoi(action[2]);
294   double size=parse_double(action[3]);
295   double clock = smpi_process()->simulated_elapsed();
296
297   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298
299   int my_proc_id = Actor::self()->getPid();
300   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302   MPI_Status status;
303   //unknow size from the receiver pov
304   if (size <= 0.0) {
305     Request::probe(from, 0, MPI_COMM_WORLD, &status);
306     size = status.count;
307   }
308
309   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310
311   TRACE_smpi_comm_out(my_proc_id);
312   get_reqq_self()->push_back(request);
313
314   log_timed_action (action, clock);
315 }
316
317 static void action_test(const char* const* action)
318 {
319   CHECK_ACTION_PARAMS(action, 0, 0)
320   double clock = smpi_process()->simulated_elapsed();
321   MPI_Status status;
322
323   MPI_Request request = get_reqq_self()->back();
324   get_reqq_self()->pop_back();
325   //if request is null here, this may mean that a previous test has succeeded
326   //Different times in traced application and replayed version may lead to this
327   //In this case, ignore the extra calls.
328   if(request!=nullptr){
329     int my_proc_id = Actor::self()->getPid();
330     TRACE_smpi_testing_in(my_proc_id);
331
332     int flag = Request::test(&request, &status);
333
334     XBT_DEBUG("MPI_Test result: %d", flag);
335     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336     get_reqq_self()->push_back(request);
337
338     TRACE_smpi_testing_out(my_proc_id);
339   }
340   log_timed_action (action, clock);
341 }
342
343 static void action_wait(const char *const *action){
344   CHECK_ACTION_PARAMS(action, 0, 0)
345   double clock = smpi_process()->simulated_elapsed();
346   MPI_Status status;
347
348   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349       xbt_str_join_array(action," "));
350   MPI_Request request = get_reqq_self()->back();
351   get_reqq_self()->pop_back();
352
353   if (request==nullptr){
354     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
355     return;
356   }
357
358   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359
360   MPI_Group group = request->comm()->group();
361   int src_traced = group->rank(request->src());
362   int dst_traced = group->rank(request->dst());
363   int is_wait_for_receive = (request->flags() & RECV);
364   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365
366   Request::wait(&request, &status);
367
368   TRACE_smpi_comm_out(rank);
369   if (is_wait_for_receive)
370     TRACE_smpi_recv(src_traced, dst_traced, 0);
371   log_timed_action (action, clock);
372 }
373
374 static void action_waitall(const char *const *action){
375   CHECK_ACTION_PARAMS(action, 0, 0)
376   double clock = smpi_process()->simulated_elapsed();
377   const unsigned int count_requests = get_reqq_self()->size();
378
379   if (count_requests>0) {
380     MPI_Status status[count_requests];
381
382     int my_proc_id_traced = Actor::self()->getPid();
383     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385     int recvs_snd[count_requests];
386     int recvs_rcv[count_requests];
387     for (unsigned int i = 0; i < count_requests; i++) {
388       const auto& req = (*get_reqq_self())[i];
389       if (req && (req->flags() & RECV)) {
390         recvs_snd[i] = req->src();
391         recvs_rcv[i] = req->dst();
392       } else
393         recvs_snd[i] = -100;
394    }
395    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396
397    for (unsigned i = 0; i < count_requests; i++) {
398      if (recvs_snd[i]!=-100)
399        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400    }
401    TRACE_smpi_comm_out(my_proc_id_traced);
402   }
403   log_timed_action (action, clock);
404 }
405
406 static void action_barrier(const char *const *action){
407   double clock = smpi_process()->simulated_elapsed();
408   int my_proc_id = Actor::self()->getPid();
409   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410
411   Colls::barrier(MPI_COMM_WORLD);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_bcast(const char *const *action)
418 {
419   CHECK_ACTION_PARAMS(action, 1, 2)
420   double size = parse_double(action[2]);
421   double clock = smpi_process()->simulated_elapsed();
422   int root     = (action[3]) ? atoi(action[3]) : 0;
423   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425
426   int my_proc_id = Actor::self()->getPid();
427   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
428                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
429                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
430
431   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
432
433   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
434
435   TRACE_smpi_comm_out(my_proc_id);
436   log_timed_action (action, clock);
437 }
438
439 static void action_reduce(const char *const *action)
440 {
441   CHECK_ACTION_PARAMS(action, 2, 2)
442   double comm_size = parse_double(action[2]);
443   double comp_size = parse_double(action[3]);
444   double clock = smpi_process()->simulated_elapsed();
445   int root         = (action[4]) ? atoi(action[4]) : 0;
446
447   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
448
449   int my_proc_id = Actor::self()->getPid();
450   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
451                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
452                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
453
454   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
457   smpi_execute_flops(comp_size);
458
459   TRACE_smpi_comm_out(my_proc_id);
460   log_timed_action (action, clock);
461 }
462
463 static void action_allReduce(const char *const *action) {
464   CHECK_ACTION_PARAMS(action, 2, 1)
465   double comm_size = parse_double(action[2]);
466   double comp_size = parse_double(action[3]);
467
468   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
469
470   double clock = smpi_process()->simulated_elapsed();
471   int my_proc_id = Actor::self()->getPid();
472   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
473                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
474
475   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
478   smpi_execute_flops(comp_size);
479
480   TRACE_smpi_comm_out(my_proc_id);
481   log_timed_action (action, clock);
482 }
483
484 static void action_allToAll(const char *const *action) {
485   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
486   double clock = smpi_process()->simulated_elapsed();
487   int comm_size = MPI_COMM_WORLD->size();
488   int send_size = parse_double(action[2]);
489   int recv_size = parse_double(action[3]);
490   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
491   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492
493   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
494   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
495
496   int my_proc_id = Actor::self()->getPid();
497   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
498                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
499                                                     encode_datatype(MPI_CURRENT_TYPE),
500                                                     encode_datatype(MPI_CURRENT_TYPE2)));
501
502   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
503
504   TRACE_smpi_comm_out(my_proc_id);
505   log_timed_action (action, clock);
506 }
507
508 static void action_gather(const char *const *action) {
509   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
510         0 gather 68 68 0 0 0
511       where:
512         1) 68 is the sendcounts
513         2) 68 is the recvcounts
514         3) 0 is the root node
515         4) 0 is the send datatype id, see decode_datatype()
516         5) 0 is the recv datatype id, see decode_datatype()
517   */
518   CHECK_ACTION_PARAMS(action, 2, 3)
519   double clock = smpi_process()->simulated_elapsed();
520   int comm_size = MPI_COMM_WORLD->size();
521   int send_size = parse_double(action[2]);
522   int recv_size = parse_double(action[3]);
523   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
524   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
525
526   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
527   void *recv = nullptr;
528   int root   = (action[4]) ? atoi(action[4]) : 0;
529   int rank = MPI_COMM_WORLD->rank();
530
531   if(rank==root)
532     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
533
534   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
535                                                                         encode_datatype(MPI_CURRENT_TYPE),
536                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
537
538   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
539
540   TRACE_smpi_comm_out(Actor::self()->getPid());
541   log_timed_action (action, clock);
542 }
543
544 static void action_scatter(const char* const* action)
545 {
546   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
547         0 gather 68 68 0 0 0
548       where:
549         1) 68 is the sendcounts
550         2) 68 is the recvcounts
551         3) 0 is the root node
552         4) 0 is the send datatype id, see decode_datatype()
553         5) 0 is the recv datatype id, see decode_datatype()
554   */
555   CHECK_ACTION_PARAMS(action, 2, 3)
556   double clock                   = smpi_process()->simulated_elapsed();
557   int comm_size                  = MPI_COMM_WORLD->size();
558   int send_size                  = parse_double(action[2]);
559   int recv_size                  = parse_double(action[3]);
560   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
561   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
562
563   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
564   void* recv = nullptr;
565   int root   = (action[4]) ? atoi(action[4]) : 0;
566   int rank = MPI_COMM_WORLD->rank();
567
568   if (rank == root)
569     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572                                                                         encode_datatype(MPI_CURRENT_TYPE),
573                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
574
575   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576
577   TRACE_smpi_comm_out(Actor::self()->getPid());
578   log_timed_action(action, clock);
579 }
580
581 static void action_gatherv(const char *const *action) {
582   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
583        0 gather 68 68 10 10 10 0 0 0
584      where:
585        1) 68 is the sendcount
586        2) 68 10 10 10 is the recvcounts
587        3) 0 is the root node
588        4) 0 is the send datatype id, see decode_datatype()
589        5) 0 is the recv datatype id, see decode_datatype()
590   */
591   double clock = smpi_process()->simulated_elapsed();
592   int comm_size = MPI_COMM_WORLD->size();
593   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
594   int send_size = parse_double(action[2]);
595   std::vector<int> disps(comm_size, 0);
596   int recvcounts[comm_size];
597   int recv_sum=0;
598
599   MPI_CURRENT_TYPE =
600       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601   MPI_Datatype MPI_CURRENT_TYPE2{
602       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
603
604   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605   void *recv = nullptr;
606   for(int i=0;i<comm_size;i++) {
607     recvcounts[i] = atoi(action[i+3]);
608     recv_sum=recv_sum+recvcounts[i];
609   }
610
611   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612   int rank = MPI_COMM_WORLD->rank();
613
614   if(rank==root)
615     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
616
617   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
618
619   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
620                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
621                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
622
623   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
624                  MPI_COMM_WORLD);
625
626   TRACE_smpi_comm_out(Actor::self()->getPid());
627   log_timed_action (action, clock);
628 }
629
630 static void action_scatterv(const char* const* action)
631 {
632   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
633        0 gather 68 10 10 10 68 0 0 0
634      where:
635        1) 68 10 10 10 is the sendcounts
636        2) 68 is the recvcount
637        3) 0 is the root node
638        4) 0 is the send datatype id, see decode_datatype()
639        5) 0 is the recv datatype id, see decode_datatype()
640   */
641   double clock  = smpi_process()->simulated_elapsed();
642   int comm_size = MPI_COMM_WORLD->size();
643   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
644   int recv_size = parse_double(action[2 + comm_size]);
645   std::vector<int> disps(comm_size, 0);
646   int sendcounts[comm_size];
647   int send_sum = 0;
648
649   MPI_CURRENT_TYPE =
650       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651   MPI_Datatype MPI_CURRENT_TYPE2{
652       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
653
654   void* send = nullptr;
655   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656   for (int i = 0; i < comm_size; i++) {
657     sendcounts[i] = atoi(action[i + 2]);
658     send_sum += sendcounts[i];
659   }
660
661   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662   int rank = MPI_COMM_WORLD->rank();
663
664   if (rank == root)
665     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
666
667   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
668
669   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
670                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
671                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
672
673   Colls::scatterv(send, sendcounts, disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
674
675   TRACE_smpi_comm_out(Actor::self()->getPid());
676   log_timed_action(action, clock);
677 }
678
679 static void action_reducescatter(const char *const *action) {
680  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
681       0 reduceScatter 275427 275427 275427 204020 11346849 0
682     where:
683       1) The first four values after the name of the action declare the recvcounts array
684       2) The value 11346849 is the amount of instructions
685       3) The last value corresponds to the datatype, see decode_datatype().
686 */
687   double clock = smpi_process()->simulated_elapsed();
688   int comm_size = MPI_COMM_WORLD->size();
689   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
690   int comp_size = parse_double(action[2+comm_size]);
691   int my_proc_id                     = Actor::self()->getPid();
692   std::vector<int>* trace_recvcounts = new std::vector<int>;
693   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
694
695   for(int i=0;i<comm_size;i++) {
696     trace_recvcounts->push_back(atoi(action[i + 2]));
697   }
698   int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
699
700   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
701                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
702                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
703                                                        encode_datatype(MPI_CURRENT_TYPE)));
704
705   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
706   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
707
708   Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
709   smpi_execute_flops(comp_size);
710
711   TRACE_smpi_comm_out(my_proc_id);
712   log_timed_action (action, clock);
713 }
714
715 static void action_allgather(const char *const *action) {
716   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
717         0 allGather 275427 275427
718     where:
719         1) 275427 is the sendcount
720         2) 275427 is the recvcount
721         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
722   */
723   double clock = smpi_process()->simulated_elapsed();
724
725   CHECK_ACTION_PARAMS(action, 2, 2)
726   int sendcount=atoi(action[2]);
727   int recvcount=atoi(action[3]);
728
729   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
730   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
731
732   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
733   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
734
735   int my_proc_id = Actor::self()->getPid();
736
737   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
738                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
739                                                     encode_datatype(MPI_CURRENT_TYPE),
740                                                     encode_datatype(MPI_CURRENT_TYPE2)));
741
742   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
743
744   TRACE_smpi_comm_out(my_proc_id);
745   log_timed_action (action, clock);
746 }
747
748 static void action_allgatherv(const char *const *action) {
749   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
750         0 allGatherV 275427 275427 275427 275427 204020
751      where:
752         1) 275427 is the sendcount
753         2) The next four elements declare the recvcounts array
754         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
755   */
756   double clock = smpi_process()->simulated_elapsed();
757
758   int comm_size = MPI_COMM_WORLD->size();
759   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
760   int sendcount=atoi(action[2]);
761   int recvcounts[comm_size];
762   std::vector<int> disps(comm_size, 0);
763   int recv_sum=0;
764
765   MPI_CURRENT_TYPE =
766       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
767   MPI_Datatype MPI_CURRENT_TYPE2{
768       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
769
770   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
771
772   for(int i=0;i<comm_size;i++) {
773     recvcounts[i] = atoi(action[i+3]);
774     recv_sum=recv_sum+recvcounts[i];
775   }
776   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
777
778   int my_proc_id = Actor::self()->getPid();
779
780   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
781
782   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
783                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
784                                                        encode_datatype(MPI_CURRENT_TYPE),
785                                                        encode_datatype(MPI_CURRENT_TYPE2)));
786
787   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
788                           MPI_COMM_WORLD);
789
790   TRACE_smpi_comm_out(my_proc_id);
791   log_timed_action (action, clock);
792 }
793
794 static void action_allToAllv(const char *const *action) {
795   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
796         0 allToAllV 100 1 7 10 12 100 1 70 10 5
797      where:
798         1) 100 is the size of the send buffer *sizeof(int),
799         2) 1 7 10 12 is the sendcounts array
800         3) 100*sizeof(int) is the size of the receiver buffer
801         4)  1 70 10 5 is the recvcounts array
802   */
803   double clock = smpi_process()->simulated_elapsed();
804
805   int comm_size = MPI_COMM_WORLD->size();
806   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
807   int send_size = 0;
808   int recv_size = 0;
809   int sendcounts[comm_size];
810   std::vector<int>* trace_sendcounts = new std::vector<int>;
811   int recvcounts[comm_size];
812   std::vector<int>* trace_recvcounts = new std::vector<int>;
813   std::vector<int> senddisps(comm_size, 0);
814   std::vector<int> recvdisps(comm_size, 0);
815
816   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
817                          ? decode_datatype(action[4 + 2 * comm_size])
818                          : MPI_DEFAULT_TYPE;
819   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
820                                      ? decode_datatype(action[5 + 2 * comm_size])
821                                      : MPI_DEFAULT_TYPE};
822
823   int send_buf_size=parse_double(action[2]);
824   int recv_buf_size=parse_double(action[3+comm_size]);
825   int my_proc_id = Actor::self()->getPid();
826   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
827   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
828
829   for(int i=0;i<comm_size;i++) {
830     sendcounts[i] = atoi(action[i+3]);
831     trace_sendcounts->push_back(sendcounts[i]);
832     send_size += sendcounts[i];
833     recvcounts[i] = atoi(action[i+4+comm_size]);
834     trace_recvcounts->push_back(recvcounts[i]);
835     recv_size += recvcounts[i];
836   }
837
838   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
839                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
840                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
841                                                        encode_datatype(MPI_CURRENT_TYPE2)));
842
843   Colls::alltoallv(sendbuf, sendcounts, senddisps.data(), MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps.data(),
844                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
845
846   TRACE_smpi_comm_out(my_proc_id);
847   log_timed_action (action, clock);
848 }
849
850 }} // namespace simgrid::smpi
851
852 /** @brief Only initialize the replay, don't do it for real */
853 void smpi_replay_init(int* argc, char*** argv)
854 {
855   simgrid::smpi::Process::init(argc, argv);
856   smpi_process()->mark_as_initialized();
857   smpi_process()->set_replaying(true);
858
859   int my_proc_id = Actor::self()->getPid();
860   TRACE_smpi_init(my_proc_id);
861   TRACE_smpi_computing_init(my_proc_id);
862   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
863   TRACE_smpi_comm_out(my_proc_id);
864   xbt_replay_action_register("init",       simgrid::smpi::action_init);
865   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
866   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
867   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
868   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
869   xbt_replay_action_register("send",       simgrid::smpi::action_send);
870   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
871   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
872   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
873   xbt_replay_action_register("test",       simgrid::smpi::action_test);
874   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
875   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
876   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
877   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
878   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
879   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
880   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
881   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
882   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
883   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
884   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
885   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
886   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
887   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
888   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
889   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
890
891   //if we have a delayed start, sleep here.
892   if(*argc>2){
893     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
894     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
895     smpi_execute_flops(value);
896   } else {
897     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
898     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
899     smpi_execute_flops(0.0);
900   }
901 }
902
903 /** @brief actually run the replay after initialization */
904 void smpi_replay_main(int* argc, char*** argv)
905 {
906   simgrid::xbt::replay_runner(*argc, *argv);
907
908   /* and now, finalize everything */
909   /* One active process will stop. Decrease the counter*/
910   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
911   if (not get_reqq_self()->empty()) {
912     unsigned int count_requests=get_reqq_self()->size();
913     MPI_Request requests[count_requests];
914     MPI_Status status[count_requests];
915     unsigned int i=0;
916
917     for (auto const& req : *get_reqq_self()) {
918       requests[i] = req;
919       i++;
920     }
921     simgrid::smpi::Request::waitall(count_requests, requests, status);
922   }
923   delete get_reqq_self();
924   active_processes--;
925
926   if(active_processes==0){
927     /* Last process alive speaking: end the simulated timer */
928     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
929     xbt_free(sendbuffer);
930     xbt_free(recvbuffer);
931   }
932
933   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
934
935   smpi_process()->finalize();
936
937   TRACE_smpi_comm_out(Actor::self()->getPid());
938   TRACE_smpi_finalize(Actor::self()->getPid());
939 }
940
941 /** @brief chain a replay initialization and a replay start */
942 void smpi_replay_run(int* argc, char*** argv)
943 {
944   smpi_replay_init(argc, argv);
945   smpi_replay_main(argc, argv);
946 }