Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: C++ify action_allgatherv
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 class ReplayActionArg {
36   ReplayActionArg() {}
37 };
38
39 static void log_timed_action (const char *const *action, double clock){
40   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41     char *name = xbt_str_join_array(action, " ");
42     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43     xbt_free(name);
44   }
45 }
46
47 static std::vector<MPI_Request>* get_reqq_self()
48 {
49   return reqq.at(Actor::self()->getPid());
50 }
51
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
53 {
54    reqq.insert({Actor::self()->getPid(), mpi_request});
55 }
56
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
59 {
60   if (not smpi_process()->replaying())
61     return xbt_malloc(size);
62   if (sendbuffer_size<size){
63     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64     sendbuffer_size=size;
65   }
66   return sendbuffer;
67 }
68
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71   if (not smpi_process()->replaying())
72     return xbt_malloc(size);
73   if (recvbuffer_size<size){
74     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75     recvbuffer_size=size;
76   }
77   return recvbuffer;
78 }
79
80 void smpi_free_tmp_buffer(void* buf){
81   if (not smpi_process()->replaying())
82     xbt_free(buf);
83 }
84
85 /* Helper function */
86 static double parse_double(const char *string)
87 {
88   char *endptr;
89   double value = strtod(string, &endptr);
90   if (*endptr != '\0')
91     THROWF(unknown_error, 0, "%s is not a double", string);
92   return value;
93 }
94
95
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
98 {
99   switch(atoi(action)) {
100     case 0:
101       return MPI_DOUBLE;
102       break;
103     case 1:
104       return MPI_INT;
105       break;
106     case 2:
107       return MPI_CHAR;
108       break;
109     case 3:
110       return MPI_SHORT;
111       break;
112     case 4:
113       return MPI_LONG;
114       break;
115     case 5:
116       return MPI_FLOAT;
117       break;
118     case 6:
119       return MPI_BYTE;
120       break;
121     default:
122       return MPI_DEFAULT_TYPE;
123       break;
124   }
125 }
126
127 const char* encode_datatype(MPI_Datatype datatype)
128 {
129   if (datatype==MPI_BYTE)
130       return "";
131   if(datatype==MPI_DOUBLE)
132       return "0";
133   if(datatype==MPI_INT)
134       return "1";
135   if(datatype==MPI_CHAR)
136       return "2";
137   if(datatype==MPI_SHORT)
138       return "3";
139   if(datatype==MPI_LONG)
140     return "4";
141   if(datatype==MPI_FLOAT)
142       return "5";
143   // default - not implemented.
144   // do not warn here as we pass in this function even for other trace formats
145   return "-1";
146 }
147
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149     int i=0;\
150     while(action[i]!=nullptr)\
151      i++;\
152     if(i<mandatory+2)                                           \
153     THROWF(arg_error, 0, "%s replay failed.\n" \
154           "%d items were given on the line. First two should be process_id and action.  " \
155           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157   }
158
159 namespace simgrid {
160 namespace smpi {
161
162 static void action_init(const char *const *action)
163 {
164   XBT_DEBUG("Initialize the counters");
165   CHECK_ACTION_PARAMS(action, 0, 1)
166   if(action[2])
167     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
168   else
169     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
170
171   /* start a simulated timer */
172   smpi_process()->simulated_start();
173   /*initialize the number of active processes */
174   active_processes = smpi_process_count();
175
176   set_reqq_self(new std::vector<MPI_Request>);
177 }
178
179 static void action_finalize(const char *const *action)
180 {
181   /* Nothing to do */
182 }
183
184 static void action_comm_size(const char *const *action)
185 {
186   communicator_size = parse_double(action[2]);
187   log_timed_action (action, smpi_process()->simulated_elapsed());
188 }
189
190 static void action_comm_split(const char *const *action)
191 {
192   log_timed_action (action, smpi_process()->simulated_elapsed());
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   log_timed_action (action, smpi_process()->simulated_elapsed());
198 }
199
200 static void action_compute(const char *const *action)
201 {
202   CHECK_ACTION_PARAMS(action, 1, 0)
203   double clock = smpi_process()->simulated_elapsed();
204   double flops= parse_double(action[2]);
205   int my_proc_id = Actor::self()->getPid();
206
207   TRACE_smpi_computing_in(my_proc_id, flops);
208   smpi_execute_flops(flops);
209   TRACE_smpi_computing_out(my_proc_id);
210
211   log_timed_action (action, clock);
212 }
213
214 static void action_send(const char *const *action)
215 {
216   CHECK_ACTION_PARAMS(action, 2, 1)
217   int to = atoi(action[2]);
218   double size=parse_double(action[3]);
219   double clock = smpi_process()->simulated_elapsed();
220
221   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
222
223   int my_proc_id = Actor::self()->getPid();
224   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
225
226   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
227                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
228   if (not TRACE_smpi_view_internals())
229     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
230
231   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
232
233   TRACE_smpi_comm_out(my_proc_id);
234
235   log_timed_action(action, clock);
236 }
237
238 static void action_Isend(const char *const *action)
239 {
240   CHECK_ACTION_PARAMS(action, 2, 1)
241   int to = atoi(action[2]);
242   double size=parse_double(action[3]);
243   double clock = smpi_process()->simulated_elapsed();
244
245   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
246
247   int my_proc_id = Actor::self()->getPid();
248   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
249   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
250                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
251   if (not TRACE_smpi_view_internals())
252     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
253
254   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
255
256   TRACE_smpi_comm_out(my_proc_id);
257
258   get_reqq_self()->push_back(request);
259
260   log_timed_action (action, clock);
261 }
262
263 static void action_recv(const char *const *action) {
264   CHECK_ACTION_PARAMS(action, 2, 1)
265   int from = atoi(action[2]);
266   double size=parse_double(action[3]);
267   double clock = smpi_process()->simulated_elapsed();
268   MPI_Status status;
269
270   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
271
272   int my_proc_id = Actor::self()->getPid();
273   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
274
275   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
276                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
277
278   //unknown size from the receiver point of view
279   if (size <= 0.0) {
280     Request::probe(from, 0, MPI_COMM_WORLD, &status);
281     size=status.count;
282   }
283
284   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
285
286   TRACE_smpi_comm_out(my_proc_id);
287   if (not TRACE_smpi_view_internals()) {
288     TRACE_smpi_recv(src_traced, my_proc_id, 0);
289   }
290
291   log_timed_action (action, clock);
292 }
293
294 static void action_Irecv(const char *const *action)
295 {
296   CHECK_ACTION_PARAMS(action, 2, 1)
297   int from = atoi(action[2]);
298   double size=parse_double(action[3]);
299   double clock = smpi_process()->simulated_elapsed();
300
301   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
302
303   int my_proc_id = Actor::self()->getPid();
304   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
305                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
306   MPI_Status status;
307   //unknow size from the receiver pov
308   if (size <= 0.0) {
309     Request::probe(from, 0, MPI_COMM_WORLD, &status);
310     size = status.count;
311   }
312
313   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
314
315   TRACE_smpi_comm_out(my_proc_id);
316   get_reqq_self()->push_back(request);
317
318   log_timed_action (action, clock);
319 }
320
321 static void action_test(const char* const* action)
322 {
323   CHECK_ACTION_PARAMS(action, 0, 0)
324   double clock = smpi_process()->simulated_elapsed();
325   MPI_Status status;
326
327   MPI_Request request = get_reqq_self()->back();
328   get_reqq_self()->pop_back();
329   //if request is null here, this may mean that a previous test has succeeded
330   //Different times in traced application and replayed version may lead to this
331   //In this case, ignore the extra calls.
332   if(request!=nullptr){
333     int my_proc_id = Actor::self()->getPid();
334     TRACE_smpi_testing_in(my_proc_id);
335
336     int flag = Request::test(&request, &status);
337
338     XBT_DEBUG("MPI_Test result: %d", flag);
339     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
340     get_reqq_self()->push_back(request);
341
342     TRACE_smpi_testing_out(my_proc_id);
343   }
344   log_timed_action (action, clock);
345 }
346
347 static void action_wait(const char *const *action){
348   CHECK_ACTION_PARAMS(action, 0, 0)
349   double clock = smpi_process()->simulated_elapsed();
350   MPI_Status status;
351
352   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
353       xbt_str_join_array(action," "));
354   MPI_Request request = get_reqq_self()->back();
355   get_reqq_self()->pop_back();
356
357   if (request==nullptr){
358     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
359     return;
360   }
361
362   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
363
364   MPI_Group group = request->comm()->group();
365   int src_traced = group->rank(request->src());
366   int dst_traced = group->rank(request->dst());
367   int is_wait_for_receive = (request->flags() & RECV);
368   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
369
370   Request::wait(&request, &status);
371
372   TRACE_smpi_comm_out(rank);
373   if (is_wait_for_receive)
374     TRACE_smpi_recv(src_traced, dst_traced, 0);
375   log_timed_action (action, clock);
376 }
377
378 static void action_waitall(const char *const *action){
379   CHECK_ACTION_PARAMS(action, 0, 0)
380   double clock = smpi_process()->simulated_elapsed();
381   const unsigned int count_requests = get_reqq_self()->size();
382
383   if (count_requests>0) {
384     MPI_Status status[count_requests];
385
386     int my_proc_id_traced = Actor::self()->getPid();
387     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
388                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
389     int recvs_snd[count_requests];
390     int recvs_rcv[count_requests];
391     for (unsigned int i = 0; i < count_requests; i++) {
392       const auto& req = (*get_reqq_self())[i];
393       if (req && (req->flags() & RECV)) {
394         recvs_snd[i] = req->src();
395         recvs_rcv[i] = req->dst();
396       } else
397         recvs_snd[i] = -100;
398    }
399    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
400
401    for (unsigned i = 0; i < count_requests; i++) {
402      if (recvs_snd[i]!=-100)
403        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
404    }
405    TRACE_smpi_comm_out(my_proc_id_traced);
406   }
407   log_timed_action (action, clock);
408 }
409
410 static void action_barrier(const char *const *action){
411   double clock = smpi_process()->simulated_elapsed();
412   int my_proc_id = Actor::self()->getPid();
413   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
414
415   Colls::barrier(MPI_COMM_WORLD);
416
417   TRACE_smpi_comm_out(my_proc_id);
418   log_timed_action (action, clock);
419 }
420
421 static void action_bcast(const char *const *action)
422 {
423   CHECK_ACTION_PARAMS(action, 1, 2)
424   double size = parse_double(action[2]);
425   double clock = smpi_process()->simulated_elapsed();
426   int root     = (action[3]) ? atoi(action[3]) : 0;
427   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
428   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
429
430   int my_proc_id = Actor::self()->getPid();
431   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
434
435   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
436
437   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
438
439   TRACE_smpi_comm_out(my_proc_id);
440   log_timed_action (action, clock);
441 }
442
443 static void action_reduce(const char *const *action)
444 {
445   CHECK_ACTION_PARAMS(action, 2, 2)
446   double comm_size = parse_double(action[2]);
447   double comp_size = parse_double(action[3]);
448   double clock = smpi_process()->simulated_elapsed();
449   int root         = (action[4]) ? atoi(action[4]) : 0;
450
451   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
452
453   int my_proc_id = Actor::self()->getPid();
454   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
457
458   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461   smpi_execute_flops(comp_size);
462
463   TRACE_smpi_comm_out(my_proc_id);
464   log_timed_action (action, clock);
465 }
466
467 static void action_allReduce(const char *const *action) {
468   CHECK_ACTION_PARAMS(action, 2, 1)
469   double comm_size = parse_double(action[2]);
470   double comp_size = parse_double(action[3]);
471
472   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
473
474   double clock = smpi_process()->simulated_elapsed();
475   int my_proc_id = Actor::self()->getPid();
476   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
478
479   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482   smpi_execute_flops(comp_size);
483
484   TRACE_smpi_comm_out(my_proc_id);
485   log_timed_action (action, clock);
486 }
487
488 static void action_allToAll(const char *const *action) {
489   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490   double clock = smpi_process()->simulated_elapsed();
491   int comm_size = MPI_COMM_WORLD->size();
492   int send_size = parse_double(action[2]);
493   int recv_size = parse_double(action[3]);
494   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
496
497   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
499
500   int my_proc_id = Actor::self()->getPid();
501   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503                                                     encode_datatype(MPI_CURRENT_TYPE),
504                                                     encode_datatype(MPI_CURRENT_TYPE2)));
505
506   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
507
508   TRACE_smpi_comm_out(my_proc_id);
509   log_timed_action (action, clock);
510 }
511
512 static void action_gather(const char *const *action) {
513   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
514         0 gather 68 68 0 0 0
515       where:
516         1) 68 is the sendcounts
517         2) 68 is the recvcounts
518         3) 0 is the root node
519         4) 0 is the send datatype id, see decode_datatype()
520         5) 0 is the recv datatype id, see decode_datatype()
521   */
522   CHECK_ACTION_PARAMS(action, 2, 3)
523   double clock = smpi_process()->simulated_elapsed();
524   int comm_size = MPI_COMM_WORLD->size();
525   int send_size = parse_double(action[2]);
526   int recv_size = parse_double(action[3]);
527   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
529
530   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531   void *recv = nullptr;
532   int root   = (action[4]) ? atoi(action[4]) : 0;
533   int rank = MPI_COMM_WORLD->rank();
534
535   if(rank==root)
536     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
537
538   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539                                                                         encode_datatype(MPI_CURRENT_TYPE),
540                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
541
542   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
543
544   TRACE_smpi_comm_out(Actor::self()->getPid());
545   log_timed_action (action, clock);
546 }
547
548 static void action_scatter(const char* const* action)
549 {
550   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
551         0 gather 68 68 0 0 0
552       where:
553         1) 68 is the sendcounts
554         2) 68 is the recvcounts
555         3) 0 is the root node
556         4) 0 is the send datatype id, see decode_datatype()
557         5) 0 is the recv datatype id, see decode_datatype()
558   */
559   CHECK_ACTION_PARAMS(action, 2, 3)
560   double clock                   = smpi_process()->simulated_elapsed();
561   int comm_size                  = MPI_COMM_WORLD->size();
562   int send_size                  = parse_double(action[2]);
563   int recv_size                  = parse_double(action[3]);
564   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
566
567   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568   void* recv = nullptr;
569   int root   = (action[4]) ? atoi(action[4]) : 0;
570   int rank = MPI_COMM_WORLD->rank();
571
572   if (rank == root)
573     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
574
575   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576                                                                         encode_datatype(MPI_CURRENT_TYPE),
577                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
578
579   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
580
581   TRACE_smpi_comm_out(Actor::self()->getPid());
582   log_timed_action(action, clock);
583 }
584
585 static void action_gatherv(const char *const *action) {
586   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587        0 gather 68 68 10 10 10 0 0 0
588      where:
589        1) 68 is the sendcount
590        2) 68 10 10 10 is the recvcounts
591        3) 0 is the root node
592        4) 0 is the send datatype id, see decode_datatype()
593        5) 0 is the recv datatype id, see decode_datatype()
594   */
595   double clock = smpi_process()->simulated_elapsed();
596   int comm_size = MPI_COMM_WORLD->size();
597   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598   int send_size = parse_double(action[2]);
599   std::vector<int> disps(comm_size, 0);
600   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
601
602   MPI_Datatype MPI_CURRENT_TYPE =
603       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
604   MPI_Datatype MPI_CURRENT_TYPE2{
605       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
606
607   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
608   void *recv = nullptr;
609   for(int i=0;i<comm_size;i++) {
610     (*recvcounts)[i] = atoi(action[i + 3]);
611   }
612   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
613
614   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
615   int rank = MPI_COMM_WORLD->rank();
616
617   if(rank==root)
618     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
619
620   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
622                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623
624   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
625                  MPI_COMM_WORLD);
626
627   TRACE_smpi_comm_out(Actor::self()->getPid());
628   log_timed_action (action, clock);
629 }
630
631 static void action_scatterv(const char* const* action)
632 {
633   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634        0 gather 68 10 10 10 68 0 0 0
635      where:
636        1) 68 10 10 10 is the sendcounts
637        2) 68 is the recvcount
638        3) 0 is the root node
639        4) 0 is the send datatype id, see decode_datatype()
640        5) 0 is the recv datatype id, see decode_datatype()
641   */
642   double clock  = smpi_process()->simulated_elapsed();
643   int comm_size = MPI_COMM_WORLD->size();
644   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645   int recv_size = parse_double(action[2 + comm_size]);
646   std::vector<int> disps(comm_size, 0);
647   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
648
649   MPI_Datatype MPI_CURRENT_TYPE =
650       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651   MPI_Datatype MPI_CURRENT_TYPE2{
652       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
653
654   void* send = nullptr;
655   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656   for (int i = 0; i < comm_size; i++) {
657     (*sendcounts)[i] = atoi(action[i + 2]);
658   }
659   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
660
661   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662   int rank = MPI_COMM_WORLD->rank();
663
664   if (rank == root)
665     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
666
667   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
668                                                                            nullptr, encode_datatype(MPI_CURRENT_TYPE),
669                                                                            encode_datatype(MPI_CURRENT_TYPE2)));
670
671   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
672                   MPI_COMM_WORLD);
673
674   TRACE_smpi_comm_out(Actor::self()->getPid());
675   log_timed_action(action, clock);
676 }
677
678 static void action_reducescatter(const char *const *action) {
679  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
680       0 reduceScatter 275427 275427 275427 204020 11346849 0
681     where:
682       1) The first four values after the name of the action declare the recvcounts array
683       2) The value 11346849 is the amount of instructions
684       3) The last value corresponds to the datatype, see decode_datatype().
685 */
686   double clock = smpi_process()->simulated_elapsed();
687   int comm_size = MPI_COMM_WORLD->size();
688   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
689   int comp_size = parse_double(action[2+comm_size]);
690   int my_proc_id                     = Actor::self()->getPid();
691   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
692   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
693
694   for(int i=0;i<comm_size;i++) {
695     recvcounts->push_back(atoi(action[i + 2]));
696   }
697   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
698
699   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
700                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
701                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
702                                                        encode_datatype(MPI_CURRENT_TYPE)));
703
704   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
705   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
706
707   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
708   smpi_execute_flops(comp_size);
709
710   TRACE_smpi_comm_out(my_proc_id);
711   log_timed_action (action, clock);
712 }
713
714 static void action_allgather(const char *const *action) {
715   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
716         0 allGather 275427 275427
717     where:
718         1) 275427 is the sendcount
719         2) 275427 is the recvcount
720         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
721   */
722   double clock = smpi_process()->simulated_elapsed();
723
724   CHECK_ACTION_PARAMS(action, 2, 2)
725   int sendcount=atoi(action[2]);
726   int recvcount=atoi(action[3]);
727
728   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
729   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
730
731   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
732   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
733
734   int my_proc_id = Actor::self()->getPid();
735
736   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
737                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
738                                                     encode_datatype(MPI_CURRENT_TYPE),
739                                                     encode_datatype(MPI_CURRENT_TYPE2)));
740
741   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
742
743   TRACE_smpi_comm_out(my_proc_id);
744   log_timed_action (action, clock);
745 }
746
747 static void action_allgatherv(const char *const *action) {
748   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
749         0 allGatherV 275427 275427 275427 275427 204020
750      where:
751         1) 275427 is the sendcount
752         2) The next four elements declare the recvcounts array
753         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
754   */
755   double clock = smpi_process()->simulated_elapsed();
756
757   int comm_size = MPI_COMM_WORLD->size();
758   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
759   int sendcount=atoi(action[2]);
760   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
761   std::vector<int> disps(comm_size, 0);
762
763   MPI_Datatype MPI_CURRENT_TYPE =
764       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
765   MPI_Datatype MPI_CURRENT_TYPE2{
766       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
767
768   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
769
770   for(int i=0;i<comm_size;i++) {
771     (*recvcounts)[i] = atoi(action[i + 3]);
772   }
773   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
774   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
775
776   int my_proc_id = Actor::self()->getPid();
777
778   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
780                                                        encode_datatype(MPI_CURRENT_TYPE),
781                                                        encode_datatype(MPI_CURRENT_TYPE2)));
782
783   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
784                     MPI_COMM_WORLD);
785
786   TRACE_smpi_comm_out(my_proc_id);
787   log_timed_action (action, clock);
788 }
789
790 static void action_allToAllv(const char *const *action) {
791   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
792         0 allToAllV 100 1 7 10 12 100 1 70 10 5
793      where:
794         1) 100 is the size of the send buffer *sizeof(int),
795         2) 1 7 10 12 is the sendcounts array
796         3) 100*sizeof(int) is the size of the receiver buffer
797         4)  1 70 10 5 is the recvcounts array
798   */
799   double clock = smpi_process()->simulated_elapsed();
800
801   int comm_size = MPI_COMM_WORLD->size();
802   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
803   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
804   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
805   std::vector<int> senddisps(comm_size, 0);
806   std::vector<int> recvdisps(comm_size, 0);
807
808   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
809                                       ? decode_datatype(action[4 + 2 * comm_size])
810                                       : MPI_DEFAULT_TYPE;
811   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812                                      ? decode_datatype(action[5 + 2 * comm_size])
813                                      : MPI_DEFAULT_TYPE};
814
815   int send_buf_size=parse_double(action[2]);
816   int recv_buf_size=parse_double(action[3+comm_size]);
817   int my_proc_id = Actor::self()->getPid();
818   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
819   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
820
821   for(int i=0;i<comm_size;i++) {
822     (*sendcounts)[i] = atoi(action[3 + i]);
823     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
824   }
825   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
826   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
827
828   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
829                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
830                                                        encode_datatype(MPI_CURRENT_TYPE),
831                                                        encode_datatype(MPI_CURRENT_TYPE2)));
832
833   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
834                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
835
836   TRACE_smpi_comm_out(my_proc_id);
837   log_timed_action (action, clock);
838 }
839
840 }} // namespace simgrid::smpi
841
842 /** @brief Only initialize the replay, don't do it for real */
843 void smpi_replay_init(int* argc, char*** argv)
844 {
845   simgrid::smpi::Process::init(argc, argv);
846   smpi_process()->mark_as_initialized();
847   smpi_process()->set_replaying(true);
848
849   int my_proc_id = Actor::self()->getPid();
850   TRACE_smpi_init(my_proc_id);
851   TRACE_smpi_computing_init(my_proc_id);
852   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
853   TRACE_smpi_comm_out(my_proc_id);
854   xbt_replay_action_register("init",       simgrid::smpi::action_init);
855   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
856   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
857   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
858   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
859   xbt_replay_action_register("send",       simgrid::smpi::action_send);
860   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
861   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
862   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
863   xbt_replay_action_register("test",       simgrid::smpi::action_test);
864   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
865   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
866   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
867   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
868   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
869   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
870   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
871   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
872   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
873   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
874   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
875   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
876   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
877   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
878   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
879   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
880
881   //if we have a delayed start, sleep here.
882   if(*argc>2){
883     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
884     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
885     smpi_execute_flops(value);
886   } else {
887     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
888     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
889     smpi_execute_flops(0.0);
890   }
891 }
892
893 /** @brief actually run the replay after initialization */
894 void smpi_replay_main(int* argc, char*** argv)
895 {
896   simgrid::xbt::replay_runner(*argc, *argv);
897
898   /* and now, finalize everything */
899   /* One active process will stop. Decrease the counter*/
900   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
901   if (not get_reqq_self()->empty()) {
902     unsigned int count_requests=get_reqq_self()->size();
903     MPI_Request requests[count_requests];
904     MPI_Status status[count_requests];
905     unsigned int i=0;
906
907     for (auto const& req : *get_reqq_self()) {
908       requests[i] = req;
909       i++;
910     }
911     simgrid::smpi::Request::waitall(count_requests, requests, status);
912   }
913   delete get_reqq_self();
914   active_processes--;
915
916   if(active_processes==0){
917     /* Last process alive speaking: end the simulated timer */
918     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
919     xbt_free(sendbuffer);
920     xbt_free(recvbuffer);
921   }
922
923   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
924
925   smpi_process()->finalize();
926
927   TRACE_smpi_comm_out(Actor::self()->getPid());
928   TRACE_smpi_finalize(Actor::self()->getPid());
929 }
930
931 /** @brief chain a replay initialization and a replay start */
932 void smpi_replay_run(int* argc, char*** argv)
933 {
934   smpi_replay_init(argc, argv);
935   smpi_replay_main(argc, argv);
936 }