Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4be3f61a3e4bb58634f2894c43468eb2d1398d0e
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size = 0;
32 static char* sendbuffer    = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer    = nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       return MPI_DOUBLE;
99       break;
100     case 1:
101       return MPI_INT;
102       break;
103     case 2:
104       return MPI_CHAR;
105       break;
106     case 3:
107       return MPI_SHORT;
108       break;
109     case 4:
110       return MPI_LONG;
111       break;
112     case 5:
113       return MPI_FLOAT;
114       break;
115     case 6:
116       return MPI_BYTE;
117       break;
118     default:
119       return MPI_DEFAULT_TYPE;
120       break;
121   }
122 }
123
124 const char* encode_datatype(MPI_Datatype datatype)
125 {
126   if (datatype==MPI_BYTE)
127       return "";
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   // default - not implemented.
141   // do not warn here as we pass in this function even for other trace formats
142   return "-1";
143 }
144
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146     int i=0;\
147     while(action[i]!=nullptr)\
148      i++;\
149     if(i<mandatory+2)                                           \
150     THROWF(arg_error, 0, "%s replay failed.\n" \
151           "%d items were given on the line. First two should be process_id and action.  " \
152           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154   }
155
156 namespace simgrid {
157 namespace smpi {
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2])
164     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165   else
166     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167
168   /* start a simulated timer */
169   smpi_process()->simulated_start();
170   /*initialize the number of active processes */
171   active_processes = smpi_process_count();
172
173   set_reqq_self(new std::vector<MPI_Request>);
174 }
175
176 static void action_finalize(const char *const *action)
177 {
178   /* Nothing to do */
179 }
180
181 static void action_comm_size(const char *const *action)
182 {
183   communicator_size = parse_double(action[2]);
184   log_timed_action (action, smpi_process()->simulated_elapsed());
185 }
186
187 static void action_comm_split(const char *const *action)
188 {
189   log_timed_action (action, smpi_process()->simulated_elapsed());
190 }
191
192 static void action_comm_dup(const char *const *action)
193 {
194   log_timed_action (action, smpi_process()->simulated_elapsed());
195 }
196
197 static void action_compute(const char *const *action)
198 {
199   CHECK_ACTION_PARAMS(action, 1, 0)
200   double clock = smpi_process()->simulated_elapsed();
201   double flops= parse_double(action[2]);
202   int my_proc_id = Actor::self()->getPid();
203
204   TRACE_smpi_computing_in(my_proc_id, flops);
205   smpi_execute_flops(flops);
206   TRACE_smpi_computing_out(my_proc_id);
207
208   log_timed_action (action, clock);
209 }
210
211 static void action_send(const char *const *action)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1)
214   int to = atoi(action[2]);
215   double size=parse_double(action[3]);
216   double clock = smpi_process()->simulated_elapsed();
217
218   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219
220   int my_proc_id = Actor::self()->getPid();
221   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222
223   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225   if (not TRACE_smpi_view_internals())
226     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227
228   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229
230   TRACE_smpi_comm_out(my_proc_id);
231
232   log_timed_action(action, clock);
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   CHECK_ACTION_PARAMS(action, 2, 1)
238   int to = atoi(action[2]);
239   double size=parse_double(action[3]);
240   double clock = smpi_process()->simulated_elapsed();
241
242   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243
244   int my_proc_id = Actor::self()->getPid();
245   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248   if (not TRACE_smpi_view_internals())
249     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250
251   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252
253   TRACE_smpi_comm_out(my_proc_id);
254
255   get_reqq_self()->push_back(request);
256
257   log_timed_action (action, clock);
258 }
259
260 static void action_recv(const char *const *action) {
261   CHECK_ACTION_PARAMS(action, 2, 1)
262   int from = atoi(action[2]);
263   double size=parse_double(action[3]);
264   double clock = smpi_process()->simulated_elapsed();
265   MPI_Status status;
266
267   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268
269   int my_proc_id = Actor::self()->getPid();
270   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271
272   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274
275   //unknown size from the receiver point of view
276   if (size <= 0.0) {
277     Request::probe(from, 0, MPI_COMM_WORLD, &status);
278     size=status.count;
279   }
280
281   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282
283   TRACE_smpi_comm_out(my_proc_id);
284   if (not TRACE_smpi_view_internals()) {
285     TRACE_smpi_recv(src_traced, my_proc_id, 0);
286   }
287
288   log_timed_action (action, clock);
289 }
290
291 static void action_Irecv(const char *const *action)
292 {
293   CHECK_ACTION_PARAMS(action, 2, 1)
294   int from = atoi(action[2]);
295   double size=parse_double(action[3]);
296   double clock = smpi_process()->simulated_elapsed();
297
298   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299
300   int my_proc_id = Actor::self()->getPid();
301   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303   MPI_Status status;
304   //unknow size from the receiver pov
305   if (size <= 0.0) {
306     Request::probe(from, 0, MPI_COMM_WORLD, &status);
307     size = status.count;
308   }
309
310   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311
312   TRACE_smpi_comm_out(my_proc_id);
313   get_reqq_self()->push_back(request);
314
315   log_timed_action (action, clock);
316 }
317
318 static void action_test(const char* const* action)
319 {
320   CHECK_ACTION_PARAMS(action, 0, 0)
321   double clock = smpi_process()->simulated_elapsed();
322   MPI_Status status;
323
324   MPI_Request request = get_reqq_self()->back();
325   get_reqq_self()->pop_back();
326   //if request is null here, this may mean that a previous test has succeeded
327   //Different times in traced application and replayed version may lead to this
328   //In this case, ignore the extra calls.
329   if(request!=nullptr){
330     int my_proc_id = Actor::self()->getPid();
331     TRACE_smpi_testing_in(my_proc_id);
332
333     int flag = Request::test(&request, &status);
334
335     XBT_DEBUG("MPI_Test result: %d", flag);
336     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337     get_reqq_self()->push_back(request);
338
339     TRACE_smpi_testing_out(my_proc_id);
340   }
341   log_timed_action (action, clock);
342 }
343
344 static void action_wait(const char *const *action){
345   CHECK_ACTION_PARAMS(action, 0, 0)
346   double clock = smpi_process()->simulated_elapsed();
347   MPI_Status status;
348
349   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350       xbt_str_join_array(action," "));
351   MPI_Request request = get_reqq_self()->back();
352   get_reqq_self()->pop_back();
353
354   if (request==nullptr){
355     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
356     return;
357   }
358
359   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360
361   MPI_Group group = request->comm()->group();
362   int src_traced = group->rank(request->src());
363   int dst_traced = group->rank(request->dst());
364   int is_wait_for_receive = (request->flags() & RECV);
365   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366
367   Request::wait(&request, &status);
368
369   TRACE_smpi_comm_out(rank);
370   if (is_wait_for_receive)
371     TRACE_smpi_recv(src_traced, dst_traced, 0);
372   log_timed_action (action, clock);
373 }
374
375 static void action_waitall(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0)
377   double clock = smpi_process()->simulated_elapsed();
378   const unsigned int count_requests = get_reqq_self()->size();
379
380   if (count_requests>0) {
381     MPI_Status status[count_requests];
382
383     int my_proc_id_traced = Actor::self()->getPid();
384     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386     int recvs_snd[count_requests];
387     int recvs_rcv[count_requests];
388     for (unsigned int i = 0; i < count_requests; i++) {
389       const auto& req = (*get_reqq_self())[i];
390       if (req && (req->flags() & RECV)) {
391         recvs_snd[i] = req->src();
392         recvs_rcv[i] = req->dst();
393       } else
394         recvs_snd[i] = -100;
395    }
396    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397
398    for (unsigned i = 0; i < count_requests; i++) {
399      if (recvs_snd[i]!=-100)
400        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401    }
402    TRACE_smpi_comm_out(my_proc_id_traced);
403   }
404   log_timed_action (action, clock);
405 }
406
407 static void action_barrier(const char *const *action){
408   double clock = smpi_process()->simulated_elapsed();
409   int my_proc_id = Actor::self()->getPid();
410   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411
412   Colls::barrier(MPI_COMM_WORLD);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_bcast(const char *const *action)
419 {
420   CHECK_ACTION_PARAMS(action, 1, 2)
421   double size = parse_double(action[2]);
422   double clock = smpi_process()->simulated_elapsed();
423   int root     = (action[3]) ? atoi(action[3]) : 0;
424   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431
432   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433
434   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_reduce(const char *const *action)
441 {
442   CHECK_ACTION_PARAMS(action, 2, 2)
443   double comm_size = parse_double(action[2]);
444   double comp_size = parse_double(action[3]);
445   double clock = smpi_process()->simulated_elapsed();
446   int root         = (action[4]) ? atoi(action[4]) : 0;
447
448   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449
450   int my_proc_id = Actor::self()->getPid();
451   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454
455   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458   smpi_execute_flops(comp_size);
459
460   TRACE_smpi_comm_out(my_proc_id);
461   log_timed_action (action, clock);
462 }
463
464 static void action_allReduce(const char *const *action) {
465   CHECK_ACTION_PARAMS(action, 2, 1)
466   double comm_size = parse_double(action[2]);
467   double comp_size = parse_double(action[3]);
468
469   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470
471   double clock = smpi_process()->simulated_elapsed();
472   int my_proc_id = Actor::self()->getPid();
473   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
475
476   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479   smpi_execute_flops(comp_size);
480
481   TRACE_smpi_comm_out(my_proc_id);
482   log_timed_action (action, clock);
483 }
484
485 static void action_allToAll(const char *const *action) {
486   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487   double clock = smpi_process()->simulated_elapsed();
488   int comm_size = MPI_COMM_WORLD->size();
489   int send_size = parse_double(action[2]);
490   int recv_size = parse_double(action[3]);
491   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493
494   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496
497   int my_proc_id = Actor::self()->getPid();
498   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500                                                     encode_datatype(MPI_CURRENT_TYPE),
501                                                     encode_datatype(MPI_CURRENT_TYPE2)));
502
503   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(my_proc_id);
506   log_timed_action (action, clock);
507 }
508
509 static void action_gather(const char *const *action) {
510   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
511         0 gather 68 68 0 0 0
512       where:
513         1) 68 is the sendcounts
514         2) 68 is the recvcounts
515         3) 0 is the root node
516         4) 0 is the send datatype id, see decode_datatype()
517         5) 0 is the recv datatype id, see decode_datatype()
518   */
519   CHECK_ACTION_PARAMS(action, 2, 3)
520   double clock = smpi_process()->simulated_elapsed();
521   int comm_size = MPI_COMM_WORLD->size();
522   int send_size = parse_double(action[2]);
523   int recv_size = parse_double(action[3]);
524   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526
527   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528   void *recv = nullptr;
529   int root   = (action[4]) ? atoi(action[4]) : 0;
530   int rank = MPI_COMM_WORLD->rank();
531
532   if(rank==root)
533     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534
535   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536                                                                         encode_datatype(MPI_CURRENT_TYPE),
537                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
538
539   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540
541   TRACE_smpi_comm_out(Actor::self()->getPid());
542   log_timed_action (action, clock);
543 }
544
545 static void action_scatter(const char* const* action)
546 {
547   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
548         0 gather 68 68 0 0 0
549       where:
550         1) 68 is the sendcounts
551         2) 68 is the recvcounts
552         3) 0 is the root node
553         4) 0 is the send datatype id, see decode_datatype()
554         5) 0 is the recv datatype id, see decode_datatype()
555   */
556   CHECK_ACTION_PARAMS(action, 2, 3)
557   double clock                   = smpi_process()->simulated_elapsed();
558   int comm_size                  = MPI_COMM_WORLD->size();
559   int send_size                  = parse_double(action[2]);
560   int recv_size                  = parse_double(action[3]);
561   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563
564   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565   void* recv = nullptr;
566   int root   = (action[4]) ? atoi(action[4]) : 0;
567   int rank = MPI_COMM_WORLD->rank();
568
569   if (rank == root)
570     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571
572   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573                                                                         encode_datatype(MPI_CURRENT_TYPE),
574                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
575
576   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action(action, clock);
580 }
581
582 static void action_gatherv(const char *const *action) {
583   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584        0 gather 68 68 10 10 10 0 0 0
585      where:
586        1) 68 is the sendcount
587        2) 68 10 10 10 is the recvcounts
588        3) 0 is the root node
589        4) 0 is the send datatype id, see decode_datatype()
590        5) 0 is the recv datatype id, see decode_datatype()
591   */
592   double clock = smpi_process()->simulated_elapsed();
593   int comm_size = MPI_COMM_WORLD->size();
594   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595   int send_size = parse_double(action[2]);
596   std::vector<int> disps(comm_size, 0);
597   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
598
599   MPI_CURRENT_TYPE =
600       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
601   MPI_Datatype MPI_CURRENT_TYPE2{
602       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
603
604   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
605   void *recv = nullptr;
606   for(int i=0;i<comm_size;i++) {
607     (*recvcounts)[i] = atoi(action[i + 3]);
608   }
609   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
610
611   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612   int rank = MPI_COMM_WORLD->rank();
613
614   if(rank==root)
615     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
616
617   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
618                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
619                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
620
621   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
622                  MPI_COMM_WORLD);
623
624   TRACE_smpi_comm_out(Actor::self()->getPid());
625   log_timed_action (action, clock);
626 }
627
628 static void action_scatterv(const char* const* action)
629 {
630   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
631        0 gather 68 10 10 10 68 0 0 0
632      where:
633        1) 68 10 10 10 is the sendcounts
634        2) 68 is the recvcount
635        3) 0 is the root node
636        4) 0 is the send datatype id, see decode_datatype()
637        5) 0 is the recv datatype id, see decode_datatype()
638   */
639   double clock  = smpi_process()->simulated_elapsed();
640   int comm_size = MPI_COMM_WORLD->size();
641   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
642   int recv_size = parse_double(action[2 + comm_size]);
643   std::vector<int> disps(comm_size, 0);
644   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
645
646   MPI_CURRENT_TYPE =
647       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
648   MPI_Datatype MPI_CURRENT_TYPE2{
649       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
650
651   void* send = nullptr;
652   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
653   for (int i = 0; i < comm_size; i++) {
654     (*sendcounts)[i] = atoi(action[i + 2]);
655   }
656   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
657
658   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
659   int rank = MPI_COMM_WORLD->rank();
660
661   if (rank == root)
662     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
663
664   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
665                                                                            nullptr, encode_datatype(MPI_CURRENT_TYPE),
666                                                                            encode_datatype(MPI_CURRENT_TYPE2)));
667
668   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
669                   MPI_COMM_WORLD);
670
671   TRACE_smpi_comm_out(Actor::self()->getPid());
672   log_timed_action(action, clock);
673 }
674
675 static void action_reducescatter(const char *const *action) {
676  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
677       0 reduceScatter 275427 275427 275427 204020 11346849 0
678     where:
679       1) The first four values after the name of the action declare the recvcounts array
680       2) The value 11346849 is the amount of instructions
681       3) The last value corresponds to the datatype, see decode_datatype().
682 */
683   double clock = smpi_process()->simulated_elapsed();
684   int comm_size = MPI_COMM_WORLD->size();
685   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
686   int comp_size = parse_double(action[2+comm_size]);
687   int my_proc_id                     = Actor::self()->getPid();
688   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
689   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
690
691   for(int i=0;i<comm_size;i++) {
692     recvcounts->push_back(atoi(action[i + 2]));
693   }
694   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
695
696   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
697                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
698                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
699                                                        encode_datatype(MPI_CURRENT_TYPE)));
700
701   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
702   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
703
704   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
705   smpi_execute_flops(comp_size);
706
707   TRACE_smpi_comm_out(my_proc_id);
708   log_timed_action (action, clock);
709 }
710
711 static void action_allgather(const char *const *action) {
712   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
713         0 allGather 275427 275427
714     where:
715         1) 275427 is the sendcount
716         2) 275427 is the recvcount
717         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
718   */
719   double clock = smpi_process()->simulated_elapsed();
720
721   CHECK_ACTION_PARAMS(action, 2, 2)
722   int sendcount=atoi(action[2]);
723   int recvcount=atoi(action[3]);
724
725   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
726   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
727
728   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
729   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
730
731   int my_proc_id = Actor::self()->getPid();
732
733   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
734                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
735                                                     encode_datatype(MPI_CURRENT_TYPE),
736                                                     encode_datatype(MPI_CURRENT_TYPE2)));
737
738   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
739
740   TRACE_smpi_comm_out(my_proc_id);
741   log_timed_action (action, clock);
742 }
743
744 static void action_allgatherv(const char *const *action) {
745   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
746         0 allGatherV 275427 275427 275427 275427 204020
747      where:
748         1) 275427 is the sendcount
749         2) The next four elements declare the recvcounts array
750         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
751   */
752   double clock = smpi_process()->simulated_elapsed();
753
754   int comm_size = MPI_COMM_WORLD->size();
755   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
756   int sendcount=atoi(action[2]);
757   int recvcounts[comm_size];
758   std::vector<int> disps(comm_size, 0);
759   int recv_sum=0;
760
761   MPI_CURRENT_TYPE =
762       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
763   MPI_Datatype MPI_CURRENT_TYPE2{
764       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
765
766   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
767
768   for(int i=0;i<comm_size;i++) {
769     recvcounts[i] = atoi(action[i+3]);
770     recv_sum=recv_sum+recvcounts[i];
771   }
772   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
773
774   int my_proc_id = Actor::self()->getPid();
775
776   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
777
778   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
779                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
780                                                        encode_datatype(MPI_CURRENT_TYPE),
781                                                        encode_datatype(MPI_CURRENT_TYPE2)));
782
783   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
784                           MPI_COMM_WORLD);
785
786   TRACE_smpi_comm_out(my_proc_id);
787   log_timed_action (action, clock);
788 }
789
790 static void action_allToAllv(const char *const *action) {
791   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
792         0 allToAllV 100 1 7 10 12 100 1 70 10 5
793      where:
794         1) 100 is the size of the send buffer *sizeof(int),
795         2) 1 7 10 12 is the sendcounts array
796         3) 100*sizeof(int) is the size of the receiver buffer
797         4)  1 70 10 5 is the recvcounts array
798   */
799   double clock = smpi_process()->simulated_elapsed();
800
801   int comm_size = MPI_COMM_WORLD->size();
802   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
803   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
804   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
805   std::vector<int> senddisps(comm_size, 0);
806   std::vector<int> recvdisps(comm_size, 0);
807
808   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
809                          ? decode_datatype(action[4 + 2 * comm_size])
810                          : MPI_DEFAULT_TYPE;
811   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812                                      ? decode_datatype(action[5 + 2 * comm_size])
813                                      : MPI_DEFAULT_TYPE};
814
815   int send_buf_size=parse_double(action[2]);
816   int recv_buf_size=parse_double(action[3+comm_size]);
817   int my_proc_id = Actor::self()->getPid();
818   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
819   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
820
821   for(int i=0;i<comm_size;i++) {
822     (*sendcounts)[i] = atoi(action[3 + i]);
823     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
824   }
825   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
826   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
827
828   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
829                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
830                                                        encode_datatype(MPI_CURRENT_TYPE),
831                                                        encode_datatype(MPI_CURRENT_TYPE2)));
832
833   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
834                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
835
836   TRACE_smpi_comm_out(my_proc_id);
837   log_timed_action (action, clock);
838 }
839
840 }} // namespace simgrid::smpi
841
842 /** @brief Only initialize the replay, don't do it for real */
843 void smpi_replay_init(int* argc, char*** argv)
844 {
845   simgrid::smpi::Process::init(argc, argv);
846   smpi_process()->mark_as_initialized();
847   smpi_process()->set_replaying(true);
848
849   int my_proc_id = Actor::self()->getPid();
850   TRACE_smpi_init(my_proc_id);
851   TRACE_smpi_computing_init(my_proc_id);
852   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
853   TRACE_smpi_comm_out(my_proc_id);
854   xbt_replay_action_register("init",       simgrid::smpi::action_init);
855   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
856   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
857   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
858   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
859   xbt_replay_action_register("send",       simgrid::smpi::action_send);
860   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
861   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
862   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
863   xbt_replay_action_register("test",       simgrid::smpi::action_test);
864   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
865   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
866   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
867   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
868   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
869   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
870   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
871   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
872   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
873   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
874   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
875   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
876   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
877   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
878   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
879   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
880
881   //if we have a delayed start, sleep here.
882   if(*argc>2){
883     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
884     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
885     smpi_execute_flops(value);
886   } else {
887     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
888     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
889     smpi_execute_flops(0.0);
890   }
891 }
892
893 /** @brief actually run the replay after initialization */
894 void smpi_replay_main(int* argc, char*** argv)
895 {
896   simgrid::xbt::replay_runner(*argc, *argv);
897
898   /* and now, finalize everything */
899   /* One active process will stop. Decrease the counter*/
900   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
901   if (not get_reqq_self()->empty()) {
902     unsigned int count_requests=get_reqq_self()->size();
903     MPI_Request requests[count_requests];
904     MPI_Status status[count_requests];
905     unsigned int i=0;
906
907     for (auto const& req : *get_reqq_self()) {
908       requests[i] = req;
909       i++;
910     }
911     simgrid::smpi::Request::waitall(count_requests, requests, status);
912   }
913   delete get_reqq_self();
914   active_processes--;
915
916   if(active_processes==0){
917     /* Last process alive speaking: end the simulated timer */
918     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
919     xbt_free(sendbuffer);
920     xbt_free(recvbuffer);
921   }
922
923   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
924
925   smpi_process()->finalize();
926
927   TRACE_smpi_comm_out(Actor::self()->getPid());
928   TRACE_smpi_finalize(Actor::self()->getPid());
929 }
930
931 /** @brief chain a replay initialization and a replay start */
932 void smpi_replay_run(int* argc, char*** argv)
933 {
934   smpi_replay_init(argc, argv);
935   smpi_replay_main(argc, argv);
936 }