Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
74c51fe37512c956e0ac9465a1d8ba1233361b5c
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size = 0;
32 static char* sendbuffer    = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer    = nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       return MPI_DOUBLE;
99       break;
100     case 1:
101       return MPI_INT;
102       break;
103     case 2:
104       return MPI_CHAR;
105       break;
106     case 3:
107       return MPI_SHORT;
108       break;
109     case 4:
110       return MPI_LONG;
111       break;
112     case 5:
113       return MPI_FLOAT;
114       break;
115     case 6:
116       return MPI_BYTE;
117       break;
118     default:
119       return MPI_DEFAULT_TYPE;
120       break;
121   }
122 }
123
124 const char* encode_datatype(MPI_Datatype datatype)
125 {
126   if (datatype==MPI_BYTE)
127       return "";
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   // default - not implemented.
141   // do not warn here as we pass in this function even for other trace formats
142   return "-1";
143 }
144
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146     int i=0;\
147     while(action[i]!=nullptr)\
148      i++;\
149     if(i<mandatory+2)                                           \
150     THROWF(arg_error, 0, "%s replay failed.\n" \
151           "%d items were given on the line. First two should be process_id and action.  " \
152           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
154   }
155
156 namespace simgrid {
157 namespace smpi {
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2])
164     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165   else
166     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167
168   /* start a simulated timer */
169   smpi_process()->simulated_start();
170   /*initialize the number of active processes */
171   active_processes = smpi_process_count();
172
173   set_reqq_self(new std::vector<MPI_Request>);
174 }
175
176 static void action_finalize(const char *const *action)
177 {
178   /* Nothing to do */
179 }
180
181 static void action_comm_size(const char *const *action)
182 {
183   communicator_size = parse_double(action[2]);
184   log_timed_action (action, smpi_process()->simulated_elapsed());
185 }
186
187 static void action_comm_split(const char *const *action)
188 {
189   log_timed_action (action, smpi_process()->simulated_elapsed());
190 }
191
192 static void action_comm_dup(const char *const *action)
193 {
194   log_timed_action (action, smpi_process()->simulated_elapsed());
195 }
196
197 static void action_compute(const char *const *action)
198 {
199   CHECK_ACTION_PARAMS(action, 1, 0)
200   double clock = smpi_process()->simulated_elapsed();
201   double flops= parse_double(action[2]);
202   int my_proc_id = Actor::self()->getPid();
203
204   TRACE_smpi_computing_in(my_proc_id, flops);
205   smpi_execute_flops(flops);
206   TRACE_smpi_computing_out(my_proc_id);
207
208   log_timed_action (action, clock);
209 }
210
211 static void action_send(const char *const *action)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1)
214   int to = atoi(action[2]);
215   double size=parse_double(action[3]);
216   double clock = smpi_process()->simulated_elapsed();
217
218   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
219
220   int my_proc_id = Actor::self()->getPid();
221   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
222
223   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225   if (not TRACE_smpi_view_internals())
226     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
227
228   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229
230   TRACE_smpi_comm_out(my_proc_id);
231
232   log_timed_action(action, clock);
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   CHECK_ACTION_PARAMS(action, 2, 1)
238   int to = atoi(action[2]);
239   double size=parse_double(action[3]);
240   double clock = smpi_process()->simulated_elapsed();
241
242   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
243
244   int my_proc_id = Actor::self()->getPid();
245   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248   if (not TRACE_smpi_view_internals())
249     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
250
251   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
252
253   TRACE_smpi_comm_out(my_proc_id);
254
255   get_reqq_self()->push_back(request);
256
257   log_timed_action (action, clock);
258 }
259
260 static void action_recv(const char *const *action) {
261   CHECK_ACTION_PARAMS(action, 2, 1)
262   int from = atoi(action[2]);
263   double size=parse_double(action[3]);
264   double clock = smpi_process()->simulated_elapsed();
265   MPI_Status status;
266
267   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
268
269   int my_proc_id = Actor::self()->getPid();
270   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
271
272   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
274
275   //unknown size from the receiver point of view
276   if (size <= 0.0) {
277     Request::probe(from, 0, MPI_COMM_WORLD, &status);
278     size=status.count;
279   }
280
281   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
282
283   TRACE_smpi_comm_out(my_proc_id);
284   if (not TRACE_smpi_view_internals()) {
285     TRACE_smpi_recv(src_traced, my_proc_id, 0);
286   }
287
288   log_timed_action (action, clock);
289 }
290
291 static void action_Irecv(const char *const *action)
292 {
293   CHECK_ACTION_PARAMS(action, 2, 1)
294   int from = atoi(action[2]);
295   double size=parse_double(action[3]);
296   double clock = smpi_process()->simulated_elapsed();
297
298   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
299
300   int my_proc_id = Actor::self()->getPid();
301   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
303   MPI_Status status;
304   //unknow size from the receiver pov
305   if (size <= 0.0) {
306     Request::probe(from, 0, MPI_COMM_WORLD, &status);
307     size = status.count;
308   }
309
310   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
311
312   TRACE_smpi_comm_out(my_proc_id);
313   get_reqq_self()->push_back(request);
314
315   log_timed_action (action, clock);
316 }
317
318 static void action_test(const char* const* action)
319 {
320   CHECK_ACTION_PARAMS(action, 0, 0)
321   double clock = smpi_process()->simulated_elapsed();
322   MPI_Status status;
323
324   MPI_Request request = get_reqq_self()->back();
325   get_reqq_self()->pop_back();
326   //if request is null here, this may mean that a previous test has succeeded
327   //Different times in traced application and replayed version may lead to this
328   //In this case, ignore the extra calls.
329   if(request!=nullptr){
330     int my_proc_id = Actor::self()->getPid();
331     TRACE_smpi_testing_in(my_proc_id);
332
333     int flag = Request::test(&request, &status);
334
335     XBT_DEBUG("MPI_Test result: %d", flag);
336     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337     get_reqq_self()->push_back(request);
338
339     TRACE_smpi_testing_out(my_proc_id);
340   }
341   log_timed_action (action, clock);
342 }
343
344 static void action_wait(const char *const *action){
345   CHECK_ACTION_PARAMS(action, 0, 0)
346   double clock = smpi_process()->simulated_elapsed();
347   MPI_Status status;
348
349   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350       xbt_str_join_array(action," "));
351   MPI_Request request = get_reqq_self()->back();
352   get_reqq_self()->pop_back();
353
354   if (request==nullptr){
355     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
356     return;
357   }
358
359   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
360
361   MPI_Group group = request->comm()->group();
362   int src_traced = group->rank(request->src());
363   int dst_traced = group->rank(request->dst());
364   int is_wait_for_receive = (request->flags() & RECV);
365   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
366
367   Request::wait(&request, &status);
368
369   TRACE_smpi_comm_out(rank);
370   if (is_wait_for_receive)
371     TRACE_smpi_recv(src_traced, dst_traced, 0);
372   log_timed_action (action, clock);
373 }
374
375 static void action_waitall(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0)
377   double clock = smpi_process()->simulated_elapsed();
378   const unsigned int count_requests = get_reqq_self()->size();
379
380   if (count_requests>0) {
381     MPI_Status status[count_requests];
382
383     int my_proc_id_traced = Actor::self()->getPid();
384     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386     int recvs_snd[count_requests];
387     int recvs_rcv[count_requests];
388     for (unsigned int i = 0; i < count_requests; i++) {
389       const auto& req = (*get_reqq_self())[i];
390       if (req && (req->flags() & RECV)) {
391         recvs_snd[i] = req->src();
392         recvs_rcv[i] = req->dst();
393       } else
394         recvs_snd[i] = -100;
395    }
396    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
397
398    for (unsigned i = 0; i < count_requests; i++) {
399      if (recvs_snd[i]!=-100)
400        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
401    }
402    TRACE_smpi_comm_out(my_proc_id_traced);
403   }
404   log_timed_action (action, clock);
405 }
406
407 static void action_barrier(const char *const *action){
408   double clock = smpi_process()->simulated_elapsed();
409   int my_proc_id = Actor::self()->getPid();
410   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
411
412   Colls::barrier(MPI_COMM_WORLD);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_bcast(const char *const *action)
419 {
420   CHECK_ACTION_PARAMS(action, 1, 2)
421   double size = parse_double(action[2]);
422   double clock = smpi_process()->simulated_elapsed();
423   int root     = (action[3]) ? atoi(action[3]) : 0;
424   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431
432   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433
434   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_reduce(const char *const *action)
441 {
442   CHECK_ACTION_PARAMS(action, 2, 2)
443   double comm_size = parse_double(action[2]);
444   double comp_size = parse_double(action[3]);
445   double clock = smpi_process()->simulated_elapsed();
446   int root         = (action[4]) ? atoi(action[4]) : 0;
447
448   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449
450   int my_proc_id = Actor::self()->getPid();
451   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454
455   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458   smpi_execute_flops(comp_size);
459
460   TRACE_smpi_comm_out(my_proc_id);
461   log_timed_action (action, clock);
462 }
463
464 static void action_allReduce(const char *const *action) {
465   CHECK_ACTION_PARAMS(action, 2, 1)
466   double comm_size = parse_double(action[2]);
467   double comp_size = parse_double(action[3]);
468
469   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470
471   double clock = smpi_process()->simulated_elapsed();
472   int my_proc_id = Actor::self()->getPid();
473   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
475
476   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479   smpi_execute_flops(comp_size);
480
481   TRACE_smpi_comm_out(my_proc_id);
482   log_timed_action (action, clock);
483 }
484
485 static void action_allToAll(const char *const *action) {
486   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487   double clock = smpi_process()->simulated_elapsed();
488   int comm_size = MPI_COMM_WORLD->size();
489   int send_size = parse_double(action[2]);
490   int recv_size = parse_double(action[3]);
491   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493
494   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496
497   int my_proc_id = Actor::self()->getPid();
498   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500                                                     encode_datatype(MPI_CURRENT_TYPE),
501                                                     encode_datatype(MPI_CURRENT_TYPE2)));
502
503   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(my_proc_id);
506   log_timed_action (action, clock);
507 }
508
509 static void action_gather(const char *const *action) {
510   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
511         0 gather 68 68 0 0 0
512       where:
513         1) 68 is the sendcounts
514         2) 68 is the recvcounts
515         3) 0 is the root node
516         4) 0 is the send datatype id, see decode_datatype()
517         5) 0 is the recv datatype id, see decode_datatype()
518   */
519   CHECK_ACTION_PARAMS(action, 2, 3)
520   double clock = smpi_process()->simulated_elapsed();
521   int comm_size = MPI_COMM_WORLD->size();
522   int send_size = parse_double(action[2]);
523   int recv_size = parse_double(action[3]);
524   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526
527   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528   void *recv = nullptr;
529   int root   = (action[4]) ? atoi(action[4]) : 0;
530   int rank = MPI_COMM_WORLD->rank();
531
532   if(rank==root)
533     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534
535   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536                                                                         encode_datatype(MPI_CURRENT_TYPE),
537                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
538
539   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540
541   TRACE_smpi_comm_out(Actor::self()->getPid());
542   log_timed_action (action, clock);
543 }
544
545 static void action_scatter(const char* const* action)
546 {
547   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
548         0 gather 68 68 0 0 0
549       where:
550         1) 68 is the sendcounts
551         2) 68 is the recvcounts
552         3) 0 is the root node
553         4) 0 is the send datatype id, see decode_datatype()
554         5) 0 is the recv datatype id, see decode_datatype()
555   */
556   CHECK_ACTION_PARAMS(action, 2, 3)
557   double clock                   = smpi_process()->simulated_elapsed();
558   int comm_size                  = MPI_COMM_WORLD->size();
559   int send_size                  = parse_double(action[2]);
560   int recv_size                  = parse_double(action[3]);
561   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563
564   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565   void* recv = nullptr;
566   int root   = (action[4]) ? atoi(action[4]) : 0;
567   int rank = MPI_COMM_WORLD->rank();
568
569   if (rank == root)
570     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571
572   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573                                                                         encode_datatype(MPI_CURRENT_TYPE),
574                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
575
576   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action(action, clock);
580 }
581
582 static void action_gatherv(const char *const *action) {
583   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584        0 gather 68 68 10 10 10 0 0 0
585      where:
586        1) 68 is the sendcount
587        2) 68 10 10 10 is the recvcounts
588        3) 0 is the root node
589        4) 0 is the send datatype id, see decode_datatype()
590        5) 0 is the recv datatype id, see decode_datatype()
591   */
592   double clock = smpi_process()->simulated_elapsed();
593   int comm_size = MPI_COMM_WORLD->size();
594   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595   int send_size = parse_double(action[2]);
596   std::vector<int> disps(comm_size, 0);
597   int recvcounts[comm_size];
598   int recv_sum=0;
599
600   MPI_CURRENT_TYPE =
601       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602   MPI_Datatype MPI_CURRENT_TYPE2{
603       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604
605   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606   void *recv = nullptr;
607   for(int i=0;i<comm_size;i++) {
608     recvcounts[i] = atoi(action[i+3]);
609     recv_sum=recv_sum+recvcounts[i];
610   }
611
612   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613   int rank = MPI_COMM_WORLD->rank();
614
615   if(rank==root)
616     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617
618   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
619
620   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623
624   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
625                  MPI_COMM_WORLD);
626
627   TRACE_smpi_comm_out(Actor::self()->getPid());
628   log_timed_action (action, clock);
629 }
630
631 static void action_scatterv(const char* const* action)
632 {
633   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634        0 gather 68 10 10 10 68 0 0 0
635      where:
636        1) 68 10 10 10 is the sendcounts
637        2) 68 is the recvcount
638        3) 0 is the root node
639        4) 0 is the send datatype id, see decode_datatype()
640        5) 0 is the recv datatype id, see decode_datatype()
641   */
642   double clock  = smpi_process()->simulated_elapsed();
643   int comm_size = MPI_COMM_WORLD->size();
644   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645   int recv_size = parse_double(action[2 + comm_size]);
646   std::vector<int> disps(comm_size, 0);
647   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
648
649   MPI_CURRENT_TYPE =
650       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
651   MPI_Datatype MPI_CURRENT_TYPE2{
652       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
653
654   void* send = nullptr;
655   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
656   for (int i = 0; i < comm_size; i++) {
657     (*sendcounts)[i] = atoi(action[i + 2]);
658   }
659   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
660
661   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662   int rank = MPI_COMM_WORLD->rank();
663
664   if (rank == root)
665     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
666
667   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
668                                                                            nullptr, encode_datatype(MPI_CURRENT_TYPE),
669                                                                            encode_datatype(MPI_CURRENT_TYPE2)));
670
671   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
672                   MPI_COMM_WORLD);
673
674   TRACE_smpi_comm_out(Actor::self()->getPid());
675   log_timed_action(action, clock);
676 }
677
678 static void action_reducescatter(const char *const *action) {
679  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
680       0 reduceScatter 275427 275427 275427 204020 11346849 0
681     where:
682       1) The first four values after the name of the action declare the recvcounts array
683       2) The value 11346849 is the amount of instructions
684       3) The last value corresponds to the datatype, see decode_datatype().
685 */
686   double clock = smpi_process()->simulated_elapsed();
687   int comm_size = MPI_COMM_WORLD->size();
688   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
689   int comp_size = parse_double(action[2+comm_size]);
690   int my_proc_id                     = Actor::self()->getPid();
691   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
692   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
693
694   for(int i=0;i<comm_size;i++) {
695     recvcounts->push_back(atoi(action[i + 2]));
696   }
697   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
698
699   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
700                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
701                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
702                                                        encode_datatype(MPI_CURRENT_TYPE)));
703
704   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
705   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
706
707   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
708   smpi_execute_flops(comp_size);
709
710   TRACE_smpi_comm_out(my_proc_id);
711   log_timed_action (action, clock);
712 }
713
714 static void action_allgather(const char *const *action) {
715   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
716         0 allGather 275427 275427
717     where:
718         1) 275427 is the sendcount
719         2) 275427 is the recvcount
720         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
721   */
722   double clock = smpi_process()->simulated_elapsed();
723
724   CHECK_ACTION_PARAMS(action, 2, 2)
725   int sendcount=atoi(action[2]);
726   int recvcount=atoi(action[3]);
727
728   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
729   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
730
731   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
732   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
733
734   int my_proc_id = Actor::self()->getPid();
735
736   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
737                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
738                                                     encode_datatype(MPI_CURRENT_TYPE),
739                                                     encode_datatype(MPI_CURRENT_TYPE2)));
740
741   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
742
743   TRACE_smpi_comm_out(my_proc_id);
744   log_timed_action (action, clock);
745 }
746
747 static void action_allgatherv(const char *const *action) {
748   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
749         0 allGatherV 275427 275427 275427 275427 204020
750      where:
751         1) 275427 is the sendcount
752         2) The next four elements declare the recvcounts array
753         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
754   */
755   double clock = smpi_process()->simulated_elapsed();
756
757   int comm_size = MPI_COMM_WORLD->size();
758   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
759   int sendcount=atoi(action[2]);
760   int recvcounts[comm_size];
761   std::vector<int> disps(comm_size, 0);
762   int recv_sum=0;
763
764   MPI_CURRENT_TYPE =
765       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
766   MPI_Datatype MPI_CURRENT_TYPE2{
767       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
768
769   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
770
771   for(int i=0;i<comm_size;i++) {
772     recvcounts[i] = atoi(action[i+3]);
773     recv_sum=recv_sum+recvcounts[i];
774   }
775   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
776
777   int my_proc_id = Actor::self()->getPid();
778
779   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
780
781   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
782                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
783                                                        encode_datatype(MPI_CURRENT_TYPE),
784                                                        encode_datatype(MPI_CURRENT_TYPE2)));
785
786   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
787                           MPI_COMM_WORLD);
788
789   TRACE_smpi_comm_out(my_proc_id);
790   log_timed_action (action, clock);
791 }
792
793 static void action_allToAllv(const char *const *action) {
794   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
795         0 allToAllV 100 1 7 10 12 100 1 70 10 5
796      where:
797         1) 100 is the size of the send buffer *sizeof(int),
798         2) 1 7 10 12 is the sendcounts array
799         3) 100*sizeof(int) is the size of the receiver buffer
800         4)  1 70 10 5 is the recvcounts array
801   */
802   double clock = smpi_process()->simulated_elapsed();
803
804   int comm_size = MPI_COMM_WORLD->size();
805   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
806   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
807   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
808   std::vector<int> senddisps(comm_size, 0);
809   std::vector<int> recvdisps(comm_size, 0);
810
811   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
812                          ? decode_datatype(action[4 + 2 * comm_size])
813                          : MPI_DEFAULT_TYPE;
814   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
815                                      ? decode_datatype(action[5 + 2 * comm_size])
816                                      : MPI_DEFAULT_TYPE};
817
818   int send_buf_size=parse_double(action[2]);
819   int recv_buf_size=parse_double(action[3+comm_size]);
820   int my_proc_id = Actor::self()->getPid();
821   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
822   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
823
824   for(int i=0;i<comm_size;i++) {
825     (*sendcounts)[i] = atoi(action[3 + i]);
826     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
827   }
828   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
829   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
830
831   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
832                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
833                                                        encode_datatype(MPI_CURRENT_TYPE),
834                                                        encode_datatype(MPI_CURRENT_TYPE2)));
835
836   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
837                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
838
839   TRACE_smpi_comm_out(my_proc_id);
840   log_timed_action (action, clock);
841 }
842
843 }} // namespace simgrid::smpi
844
845 /** @brief Only initialize the replay, don't do it for real */
846 void smpi_replay_init(int* argc, char*** argv)
847 {
848   simgrid::smpi::Process::init(argc, argv);
849   smpi_process()->mark_as_initialized();
850   smpi_process()->set_replaying(true);
851
852   int my_proc_id = Actor::self()->getPid();
853   TRACE_smpi_init(my_proc_id);
854   TRACE_smpi_computing_init(my_proc_id);
855   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
856   TRACE_smpi_comm_out(my_proc_id);
857   xbt_replay_action_register("init",       simgrid::smpi::action_init);
858   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
859   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
860   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
861   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
862   xbt_replay_action_register("send",       simgrid::smpi::action_send);
863   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
864   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
865   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
866   xbt_replay_action_register("test",       simgrid::smpi::action_test);
867   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
868   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
869   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
870   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
871   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
872   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
873   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
874   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
875   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
876   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
877   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
878   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
879   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
880   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
881   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
882   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
883
884   //if we have a delayed start, sleep here.
885   if(*argc>2){
886     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
887     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
888     smpi_execute_flops(value);
889   } else {
890     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
891     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
892     smpi_execute_flops(0.0);
893   }
894 }
895
896 /** @brief actually run the replay after initialization */
897 void smpi_replay_main(int* argc, char*** argv)
898 {
899   simgrid::xbt::replay_runner(*argc, *argv);
900
901   /* and now, finalize everything */
902   /* One active process will stop. Decrease the counter*/
903   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
904   if (not get_reqq_self()->empty()) {
905     unsigned int count_requests=get_reqq_self()->size();
906     MPI_Request requests[count_requests];
907     MPI_Status status[count_requests];
908     unsigned int i=0;
909
910     for (auto const& req : *get_reqq_self()) {
911       requests[i] = req;
912       i++;
913     }
914     simgrid::smpi::Request::waitall(count_requests, requests, status);
915   }
916   delete get_reqq_self();
917   active_processes--;
918
919   if(active_processes==0){
920     /* Last process alive speaking: end the simulated timer */
921     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
922     xbt_free(sendbuffer);
923     xbt_free(recvbuffer);
924   }
925
926   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
927
928   smpi_process()->finalize();
929
930   TRACE_smpi_comm_out(Actor::self()->getPid());
931   TRACE_smpi_finalize(Actor::self()->getPid());
932 }
933
934 /** @brief chain a replay initialization and a replay start */
935 void smpi_replay_run(int* argc, char*** argv)
936 {
937   smpi_replay_init(argc, argv);
938   smpi_replay_main(argc, argv);
939 }