Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Move action_reducescatter to std::shared_ptr
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size = 0;
32 static char* sendbuffer    = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer    = nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       return MPI_DOUBLE;
99       break;
100     case 1:
101       return MPI_INT;
102       break;
103     case 2:
104       return MPI_CHAR;
105       break;
106     case 3:
107       return MPI_SHORT;
108       break;
109     case 4:
110       return MPI_LONG;
111       break;
112     case 5:
113       return MPI_FLOAT;
114       break;
115     case 6:
116       return MPI_BYTE;
117       break;
118     default:
119       return MPI_DEFAULT_TYPE;
120       break;
121   }
122 }
123
124 const char* encode_datatype(MPI_Datatype datatype)
125 {
126   if (datatype==MPI_BYTE)
127       return "";
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   // default - not implemented.
141   // do not warn here as we pass in this function even for other trace formats
142   return "-1";
143 }
144
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146     int i=0;\
147     while(action[i]!=nullptr)\
148      i++;\
149     if(i<mandatory+2)                                           \
150     THROWF(arg_error, 0, "%s replay failed.\n" \
151           "%d items were given on the line. First two should be process_id and action.  " \
152           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154   }
155
156 namespace simgrid {
157 namespace smpi {
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2])
164     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165   else
166     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167
168   /* start a simulated timer */
169   smpi_process()->simulated_start();
170   /*initialize the number of active processes */
171   active_processes = smpi_process_count();
172
173   set_reqq_self(new std::vector<MPI_Request>);
174 }
175
176 static void action_finalize(const char *const *action)
177 {
178   /* Nothing to do */
179 }
180
181 static void action_comm_size(const char *const *action)
182 {
183   communicator_size = parse_double(action[2]);
184   log_timed_action (action, smpi_process()->simulated_elapsed());
185 }
186
187 static void action_comm_split(const char *const *action)
188 {
189   log_timed_action (action, smpi_process()->simulated_elapsed());
190 }
191
192 static void action_comm_dup(const char *const *action)
193 {
194   log_timed_action (action, smpi_process()->simulated_elapsed());
195 }
196
197 static void action_compute(const char *const *action)
198 {
199   CHECK_ACTION_PARAMS(action, 1, 0)
200   double clock = smpi_process()->simulated_elapsed();
201   double flops= parse_double(action[2]);
202   int my_proc_id = Actor::self()->getPid();
203
204   TRACE_smpi_computing_in(my_proc_id, flops);
205   smpi_execute_flops(flops);
206   TRACE_smpi_computing_out(my_proc_id);
207
208   log_timed_action (action, clock);
209 }
210
211 static void action_send(const char *const *action)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1)
214   int to = atoi(action[2]);
215   double size=parse_double(action[3]);
216   double clock = smpi_process()->simulated_elapsed();
217
218   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219
220   int my_proc_id = Actor::self()->getPid();
221   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222
223   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225   if (not TRACE_smpi_view_internals())
226     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227
228   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229
230   TRACE_smpi_comm_out(my_proc_id);
231
232   log_timed_action(action, clock);
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   CHECK_ACTION_PARAMS(action, 2, 1)
238   int to = atoi(action[2]);
239   double size=parse_double(action[3]);
240   double clock = smpi_process()->simulated_elapsed();
241
242   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243
244   int my_proc_id = Actor::self()->getPid();
245   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248   if (not TRACE_smpi_view_internals())
249     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250
251   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252
253   TRACE_smpi_comm_out(my_proc_id);
254
255   get_reqq_self()->push_back(request);
256
257   log_timed_action (action, clock);
258 }
259
260 static void action_recv(const char *const *action) {
261   CHECK_ACTION_PARAMS(action, 2, 1)
262   int from = atoi(action[2]);
263   double size=parse_double(action[3]);
264   double clock = smpi_process()->simulated_elapsed();
265   MPI_Status status;
266
267   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268
269   int my_proc_id = Actor::self()->getPid();
270   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271
272   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274
275   //unknown size from the receiver point of view
276   if (size <= 0.0) {
277     Request::probe(from, 0, MPI_COMM_WORLD, &status);
278     size=status.count;
279   }
280
281   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282
283   TRACE_smpi_comm_out(my_proc_id);
284   if (not TRACE_smpi_view_internals()) {
285     TRACE_smpi_recv(src_traced, my_proc_id, 0);
286   }
287
288   log_timed_action (action, clock);
289 }
290
291 static void action_Irecv(const char *const *action)
292 {
293   CHECK_ACTION_PARAMS(action, 2, 1)
294   int from = atoi(action[2]);
295   double size=parse_double(action[3]);
296   double clock = smpi_process()->simulated_elapsed();
297
298   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299
300   int my_proc_id = Actor::self()->getPid();
301   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303   MPI_Status status;
304   //unknow size from the receiver pov
305   if (size <= 0.0) {
306     Request::probe(from, 0, MPI_COMM_WORLD, &status);
307     size = status.count;
308   }
309
310   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311
312   TRACE_smpi_comm_out(my_proc_id);
313   get_reqq_self()->push_back(request);
314
315   log_timed_action (action, clock);
316 }
317
318 static void action_test(const char* const* action)
319 {
320   CHECK_ACTION_PARAMS(action, 0, 0)
321   double clock = smpi_process()->simulated_elapsed();
322   MPI_Status status;
323
324   MPI_Request request = get_reqq_self()->back();
325   get_reqq_self()->pop_back();
326   //if request is null here, this may mean that a previous test has succeeded
327   //Different times in traced application and replayed version may lead to this
328   //In this case, ignore the extra calls.
329   if(request!=nullptr){
330     int my_proc_id = Actor::self()->getPid();
331     TRACE_smpi_testing_in(my_proc_id);
332
333     int flag = Request::test(&request, &status);
334
335     XBT_DEBUG("MPI_Test result: %d", flag);
336     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337     get_reqq_self()->push_back(request);
338
339     TRACE_smpi_testing_out(my_proc_id);
340   }
341   log_timed_action (action, clock);
342 }
343
344 static void action_wait(const char *const *action){
345   CHECK_ACTION_PARAMS(action, 0, 0)
346   double clock = smpi_process()->simulated_elapsed();
347   MPI_Status status;
348
349   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350       xbt_str_join_array(action," "));
351   MPI_Request request = get_reqq_self()->back();
352   get_reqq_self()->pop_back();
353
354   if (request==nullptr){
355     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
356     return;
357   }
358
359   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360
361   MPI_Group group = request->comm()->group();
362   int src_traced = group->rank(request->src());
363   int dst_traced = group->rank(request->dst());
364   int is_wait_for_receive = (request->flags() & RECV);
365   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366
367   Request::wait(&request, &status);
368
369   TRACE_smpi_comm_out(rank);
370   if (is_wait_for_receive)
371     TRACE_smpi_recv(src_traced, dst_traced, 0);
372   log_timed_action (action, clock);
373 }
374
375 static void action_waitall(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0)
377   double clock = smpi_process()->simulated_elapsed();
378   const unsigned int count_requests = get_reqq_self()->size();
379
380   if (count_requests>0) {
381     MPI_Status status[count_requests];
382
383     int my_proc_id_traced = Actor::self()->getPid();
384     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386     int recvs_snd[count_requests];
387     int recvs_rcv[count_requests];
388     for (unsigned int i = 0; i < count_requests; i++) {
389       const auto& req = (*get_reqq_self())[i];
390       if (req && (req->flags() & RECV)) {
391         recvs_snd[i] = req->src();
392         recvs_rcv[i] = req->dst();
393       } else
394         recvs_snd[i] = -100;
395    }
396    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397
398    for (unsigned i = 0; i < count_requests; i++) {
399      if (recvs_snd[i]!=-100)
400        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401    }
402    TRACE_smpi_comm_out(my_proc_id_traced);
403   }
404   log_timed_action (action, clock);
405 }
406
407 static void action_barrier(const char *const *action){
408   double clock = smpi_process()->simulated_elapsed();
409   int my_proc_id = Actor::self()->getPid();
410   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411
412   Colls::barrier(MPI_COMM_WORLD);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_bcast(const char *const *action)
419 {
420   CHECK_ACTION_PARAMS(action, 1, 2)
421   double size = parse_double(action[2]);
422   double clock = smpi_process()->simulated_elapsed();
423   int root     = (action[3]) ? atoi(action[3]) : 0;
424   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431
432   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433
434   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_reduce(const char *const *action)
441 {
442   CHECK_ACTION_PARAMS(action, 2, 2)
443   double comm_size = parse_double(action[2]);
444   double comp_size = parse_double(action[3]);
445   double clock = smpi_process()->simulated_elapsed();
446   int root         = (action[4]) ? atoi(action[4]) : 0;
447
448   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449
450   int my_proc_id = Actor::self()->getPid();
451   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454
455   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458   smpi_execute_flops(comp_size);
459
460   TRACE_smpi_comm_out(my_proc_id);
461   log_timed_action (action, clock);
462 }
463
464 static void action_allReduce(const char *const *action) {
465   CHECK_ACTION_PARAMS(action, 2, 1)
466   double comm_size = parse_double(action[2]);
467   double comp_size = parse_double(action[3]);
468
469   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470
471   double clock = smpi_process()->simulated_elapsed();
472   int my_proc_id = Actor::self()->getPid();
473   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
475
476   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479   smpi_execute_flops(comp_size);
480
481   TRACE_smpi_comm_out(my_proc_id);
482   log_timed_action (action, clock);
483 }
484
485 static void action_allToAll(const char *const *action) {
486   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487   double clock = smpi_process()->simulated_elapsed();
488   int comm_size = MPI_COMM_WORLD->size();
489   int send_size = parse_double(action[2]);
490   int recv_size = parse_double(action[3]);
491   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493
494   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496
497   int my_proc_id = Actor::self()->getPid();
498   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500                                                     encode_datatype(MPI_CURRENT_TYPE),
501                                                     encode_datatype(MPI_CURRENT_TYPE2)));
502
503   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(my_proc_id);
506   log_timed_action (action, clock);
507 }
508
509 static void action_gather(const char *const *action) {
510   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
511         0 gather 68 68 0 0 0
512       where:
513         1) 68 is the sendcounts
514         2) 68 is the recvcounts
515         3) 0 is the root node
516         4) 0 is the send datatype id, see decode_datatype()
517         5) 0 is the recv datatype id, see decode_datatype()
518   */
519   CHECK_ACTION_PARAMS(action, 2, 3)
520   double clock = smpi_process()->simulated_elapsed();
521   int comm_size = MPI_COMM_WORLD->size();
522   int send_size = parse_double(action[2]);
523   int recv_size = parse_double(action[3]);
524   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526
527   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528   void *recv = nullptr;
529   int root   = (action[4]) ? atoi(action[4]) : 0;
530   int rank = MPI_COMM_WORLD->rank();
531
532   if(rank==root)
533     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534
535   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536                                                                         encode_datatype(MPI_CURRENT_TYPE),
537                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
538
539   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540
541   TRACE_smpi_comm_out(Actor::self()->getPid());
542   log_timed_action (action, clock);
543 }
544
545 static void action_scatter(const char* const* action)
546 {
547   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
548         0 gather 68 68 0 0 0
549       where:
550         1) 68 is the sendcounts
551         2) 68 is the recvcounts
552         3) 0 is the root node
553         4) 0 is the send datatype id, see decode_datatype()
554         5) 0 is the recv datatype id, see decode_datatype()
555   */
556   CHECK_ACTION_PARAMS(action, 2, 3)
557   double clock                   = smpi_process()->simulated_elapsed();
558   int comm_size                  = MPI_COMM_WORLD->size();
559   int send_size                  = parse_double(action[2]);
560   int recv_size                  = parse_double(action[3]);
561   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563
564   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565   void* recv = nullptr;
566   int root   = (action[4]) ? atoi(action[4]) : 0;
567   int rank = MPI_COMM_WORLD->rank();
568
569   if (rank == root)
570     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571
572   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573                                                                         encode_datatype(MPI_CURRENT_TYPE),
574                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
575
576   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action(action, clock);
580 }
581
582 static void action_gatherv(const char *const *action) {
583   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584        0 gather 68 68 10 10 10 0 0 0
585      where:
586        1) 68 is the sendcount
587        2) 68 10 10 10 is the recvcounts
588        3) 0 is the root node
589        4) 0 is the send datatype id, see decode_datatype()
590        5) 0 is the recv datatype id, see decode_datatype()
591   */
592   double clock = smpi_process()->simulated_elapsed();
593   int comm_size = MPI_COMM_WORLD->size();
594   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595   int send_size = parse_double(action[2]);
596   std::vector<int> disps(comm_size, 0);
597   int recvcounts[comm_size];
598   int recv_sum=0;
599
600   MPI_CURRENT_TYPE =
601       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602   MPI_Datatype MPI_CURRENT_TYPE2{
603       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604
605   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606   void *recv = nullptr;
607   for(int i=0;i<comm_size;i++) {
608     recvcounts[i] = atoi(action[i+3]);
609     recv_sum=recv_sum+recvcounts[i];
610   }
611
612   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613   int rank = MPI_COMM_WORLD->rank();
614
615   if(rank==root)
616     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617
618   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
619
620   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623
624   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
625                  MPI_COMM_WORLD);
626
627   TRACE_smpi_comm_out(Actor::self()->getPid());
628   log_timed_action (action, clock);
629 }
630
631 static void action_scatterv(const char* const* action)
632 {
633   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634        0 gather 68 10 10 10 68 0 0 0
635      where:
636        1) 68 10 10 10 is the sendcounts
637        2) 68 is the recvcount
638        3) 0 is the root node
639        4) 0 is the send datatype id, see decode_datatype()
640        5) 0 is the recv datatype id, see decode_datatype()
641   */
642   double clock  = smpi_process()->simulated_elapsed();
643   int comm_size = MPI_COMM_WORLD->size();
644   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645   int recv_size = parse_double(action[2 + comm_size]);
646   std::vector<int> disps(comm_size, 0);
647   int sendcounts[comm_size];
648   int send_sum = 0;
649
650   MPI_CURRENT_TYPE =
651       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
652   MPI_Datatype MPI_CURRENT_TYPE2{
653       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654
655   void* send = nullptr;
656   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
657   for (int i = 0; i < comm_size; i++) {
658     sendcounts[i] = atoi(action[i + 2]);
659     send_sum += sendcounts[i];
660   }
661
662   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
663   int rank = MPI_COMM_WORLD->rank();
664
665   if (rank == root)
666     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667
668   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
669
670   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
672                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
673
674   Colls::scatterv(send, sendcounts, disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
675
676   TRACE_smpi_comm_out(Actor::self()->getPid());
677   log_timed_action(action, clock);
678 }
679
680 static void action_reducescatter(const char *const *action) {
681  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
682       0 reduceScatter 275427 275427 275427 204020 11346849 0
683     where:
684       1) The first four values after the name of the action declare the recvcounts array
685       2) The value 11346849 is the amount of instructions
686       3) The last value corresponds to the datatype, see decode_datatype().
687 */
688   double clock = smpi_process()->simulated_elapsed();
689   int comm_size = MPI_COMM_WORLD->size();
690   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
691   int comp_size = parse_double(action[2+comm_size]);
692   int my_proc_id                     = Actor::self()->getPid();
693   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
694   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
695
696   for(int i=0;i<comm_size;i++) {
697     recvcounts->push_back(atoi(action[i + 2]));
698   }
699   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
700
701   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
702                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
703                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
704                                                        encode_datatype(MPI_CURRENT_TYPE)));
705
706   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
707   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
708
709   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
710   smpi_execute_flops(comp_size);
711
712   TRACE_smpi_comm_out(my_proc_id);
713   log_timed_action (action, clock);
714 }
715
716 static void action_allgather(const char *const *action) {
717   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
718         0 allGather 275427 275427
719     where:
720         1) 275427 is the sendcount
721         2) 275427 is the recvcount
722         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
723   */
724   double clock = smpi_process()->simulated_elapsed();
725
726   CHECK_ACTION_PARAMS(action, 2, 2)
727   int sendcount=atoi(action[2]);
728   int recvcount=atoi(action[3]);
729
730   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
731   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
732
733   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
734   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
735
736   int my_proc_id = Actor::self()->getPid();
737
738   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
740                                                     encode_datatype(MPI_CURRENT_TYPE),
741                                                     encode_datatype(MPI_CURRENT_TYPE2)));
742
743   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
744
745   TRACE_smpi_comm_out(my_proc_id);
746   log_timed_action (action, clock);
747 }
748
749 static void action_allgatherv(const char *const *action) {
750   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
751         0 allGatherV 275427 275427 275427 275427 204020
752      where:
753         1) 275427 is the sendcount
754         2) The next four elements declare the recvcounts array
755         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
756   */
757   double clock = smpi_process()->simulated_elapsed();
758
759   int comm_size = MPI_COMM_WORLD->size();
760   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
761   int sendcount=atoi(action[2]);
762   int recvcounts[comm_size];
763   std::vector<int> disps(comm_size, 0);
764   int recv_sum=0;
765
766   MPI_CURRENT_TYPE =
767       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
768   MPI_Datatype MPI_CURRENT_TYPE2{
769       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
770
771   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
772
773   for(int i=0;i<comm_size;i++) {
774     recvcounts[i] = atoi(action[i+3]);
775     recv_sum=recv_sum+recvcounts[i];
776   }
777   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
778
779   int my_proc_id = Actor::self()->getPid();
780
781   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
782
783   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
784                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
785                                                        encode_datatype(MPI_CURRENT_TYPE),
786                                                        encode_datatype(MPI_CURRENT_TYPE2)));
787
788   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
789                           MPI_COMM_WORLD);
790
791   TRACE_smpi_comm_out(my_proc_id);
792   log_timed_action (action, clock);
793 }
794
795 static void action_allToAllv(const char *const *action) {
796   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
797         0 allToAllV 100 1 7 10 12 100 1 70 10 5
798      where:
799         1) 100 is the size of the send buffer *sizeof(int),
800         2) 1 7 10 12 is the sendcounts array
801         3) 100*sizeof(int) is the size of the receiver buffer
802         4)  1 70 10 5 is the recvcounts array
803   */
804   double clock = smpi_process()->simulated_elapsed();
805
806   int comm_size = MPI_COMM_WORLD->size();
807   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
808   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
809   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
810   std::vector<int> senddisps(comm_size, 0);
811   std::vector<int> recvdisps(comm_size, 0);
812
813   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
814                          ? decode_datatype(action[4 + 2 * comm_size])
815                          : MPI_DEFAULT_TYPE;
816   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
817                                      ? decode_datatype(action[5 + 2 * comm_size])
818                                      : MPI_DEFAULT_TYPE};
819
820   int send_buf_size=parse_double(action[2]);
821   int recv_buf_size=parse_double(action[3+comm_size]);
822   int my_proc_id = Actor::self()->getPid();
823   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
824   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
825
826   for(int i=0;i<comm_size;i++) {
827     (*sendcounts)[i] = atoi(action[3 + i]);
828     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
829   }
830   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
831   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
832
833   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
834                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
835                                                        encode_datatype(MPI_CURRENT_TYPE),
836                                                        encode_datatype(MPI_CURRENT_TYPE2)));
837
838   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
839                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
840
841   TRACE_smpi_comm_out(my_proc_id);
842   log_timed_action (action, clock);
843 }
844
845 }} // namespace simgrid::smpi
846
847 /** @brief Only initialize the replay, don't do it for real */
848 void smpi_replay_init(int* argc, char*** argv)
849 {
850   simgrid::smpi::Process::init(argc, argv);
851   smpi_process()->mark_as_initialized();
852   smpi_process()->set_replaying(true);
853
854   int my_proc_id = Actor::self()->getPid();
855   TRACE_smpi_init(my_proc_id);
856   TRACE_smpi_computing_init(my_proc_id);
857   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
858   TRACE_smpi_comm_out(my_proc_id);
859   xbt_replay_action_register("init",       simgrid::smpi::action_init);
860   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
861   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
862   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
863   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
864   xbt_replay_action_register("send",       simgrid::smpi::action_send);
865   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
866   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
867   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
868   xbt_replay_action_register("test",       simgrid::smpi::action_test);
869   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
870   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
871   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
872   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
873   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
874   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
875   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
876   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
877   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
878   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
879   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
880   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
881   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
882   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
883   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
884   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
885
886   //if we have a delayed start, sleep here.
887   if(*argc>2){
888     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
889     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
890     smpi_execute_flops(value);
891   } else {
892     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
893     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
894     smpi_execute_flops(0.0);
895   }
896 }
897
898 /** @brief actually run the replay after initialization */
899 void smpi_replay_main(int* argc, char*** argv)
900 {
901   simgrid::xbt::replay_runner(*argc, *argv);
902
903   /* and now, finalize everything */
904   /* One active process will stop. Decrease the counter*/
905   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
906   if (not get_reqq_self()->empty()) {
907     unsigned int count_requests=get_reqq_self()->size();
908     MPI_Request requests[count_requests];
909     MPI_Status status[count_requests];
910     unsigned int i=0;
911
912     for (auto const& req : *get_reqq_self()) {
913       requests[i] = req;
914       i++;
915     }
916     simgrid::smpi::Request::waitall(count_requests, requests, status);
917   }
918   delete get_reqq_self();
919   active_processes--;
920
921   if(active_processes==0){
922     /* Last process alive speaking: end the simulated timer */
923     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
924     xbt_free(sendbuffer);
925     xbt_free(recvbuffer);
926   }
927
928   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
929
930   smpi_process()->finalize();
931
932   TRACE_smpi_comm_out(Actor::self()->getPid());
933   TRACE_smpi_finalize(Actor::self()->getPid());
934 }
935
936 /** @brief chain a replay initialization and a replay start */
937 void smpi_replay_run(int* argc, char*** argv)
938 {
939   smpi_replay_init(argc, argv);
940   smpi_replay_main(argc, argv);
941 }