Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
0241c05bebb673667bc3e2660feee69988693c18
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 using simgrid::s4u::Actor;
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 static int communicator_size = 0;
23 static int active_processes  = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
25
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size = 0;
30 static char* sendbuffer    = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer    = nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(Actor::self()->getPid());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({Actor::self()->getPid(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
93 {
94   switch(atoi(action)) {
95     case 0:
96       return MPI_DOUBLE;
97       break;
98     case 1:
99       return MPI_INT;
100       break;
101     case 2:
102       return MPI_CHAR;
103       break;
104     case 3:
105       return MPI_SHORT;
106       break;
107     case 4:
108       return MPI_LONG;
109       break;
110     case 5:
111       return MPI_FLOAT;
112       break;
113     case 6:
114       return MPI_BYTE;
115       break;
116     default:
117       return MPI_DEFAULT_TYPE;
118       break;
119   }
120 }
121
122 const char* encode_datatype(MPI_Datatype datatype)
123 {
124   if (datatype==MPI_BYTE)
125       return "";
126   if(datatype==MPI_DOUBLE)
127       return "0";
128   if(datatype==MPI_INT)
129       return "1";
130   if(datatype==MPI_CHAR)
131       return "2";
132   if(datatype==MPI_SHORT)
133       return "3";
134   if(datatype==MPI_LONG)
135     return "4";
136   if(datatype==MPI_FLOAT)
137       return "5";
138   // default - not implemented.
139   // do not warn here as we pass in this function even for other trace formats
140   return "-1";
141 }
142
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144     int i=0;\
145     while(action[i]!=nullptr)\
146      i++;\
147     if(i<mandatory+2)                                           \
148     THROWF(arg_error, 0, "%s replay failed.\n" \
149           "%d items were given on the line. First two should be process_id and action.  " \
150           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152   }
153
154 namespace simgrid {
155 namespace smpi {
156
157 static void action_init(const char *const *action)
158 {
159   XBT_DEBUG("Initialize the counters");
160   CHECK_ACTION_PARAMS(action, 0, 1)
161   if(action[2])
162     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
163   else
164     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
165
166   /* start a simulated timer */
167   smpi_process()->simulated_start();
168   /*initialize the number of active processes */
169   active_processes = smpi_process_count();
170
171   set_reqq_self(new std::vector<MPI_Request>);
172 }
173
174 static void action_finalize(const char *const *action)
175 {
176   /* Nothing to do */
177 }
178
179 static void action_comm_size(const char *const *action)
180 {
181   communicator_size = parse_double(action[2]);
182   log_timed_action (action, smpi_process()->simulated_elapsed());
183 }
184
185 static void action_comm_split(const char *const *action)
186 {
187   log_timed_action (action, smpi_process()->simulated_elapsed());
188 }
189
190 static void action_comm_dup(const char *const *action)
191 {
192   log_timed_action (action, smpi_process()->simulated_elapsed());
193 }
194
195 static void action_compute(const char *const *action)
196 {
197   CHECK_ACTION_PARAMS(action, 1, 0)
198   double clock = smpi_process()->simulated_elapsed();
199   double flops= parse_double(action[2]);
200   int my_proc_id = Actor::self()->getPid();
201
202   TRACE_smpi_computing_in(my_proc_id, flops);
203   smpi_execute_flops(flops);
204   TRACE_smpi_computing_out(my_proc_id);
205
206   log_timed_action (action, clock);
207 }
208
209 static void action_send(const char *const *action)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   int to = atoi(action[2]);
213   double size=parse_double(action[3]);
214   double clock = smpi_process()->simulated_elapsed();
215
216   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
217
218   int my_proc_id = Actor::self()->getPid();
219   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
220
221   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
222                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
223   if (not TRACE_smpi_view_internals())
224     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
225
226   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227
228   TRACE_smpi_comm_out(my_proc_id);
229
230   log_timed_action(action, clock);
231 }
232
233 static void action_Isend(const char *const *action)
234 {
235   CHECK_ACTION_PARAMS(action, 2, 1)
236   int to = atoi(action[2]);
237   double size=parse_double(action[3]);
238   double clock = smpi_process()->simulated_elapsed();
239
240   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
241
242   int my_proc_id = Actor::self()->getPid();
243   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
244   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
245                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
246   if (not TRACE_smpi_view_internals())
247     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
248
249   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
250
251   TRACE_smpi_comm_out(my_proc_id);
252
253   get_reqq_self()->push_back(request);
254
255   log_timed_action (action, clock);
256 }
257
258 static void action_recv(const char *const *action) {
259   CHECK_ACTION_PARAMS(action, 2, 1)
260   int from = atoi(action[2]);
261   double size=parse_double(action[3]);
262   double clock = smpi_process()->simulated_elapsed();
263   MPI_Status status;
264
265   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
266
267   int my_proc_id = Actor::self()->getPid();
268   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
269
270   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
271                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
272
273   //unknown size from the receiver point of view
274   if (size <= 0.0) {
275     Request::probe(from, 0, MPI_COMM_WORLD, &status);
276     size=status.count;
277   }
278
279   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
280
281   TRACE_smpi_comm_out(my_proc_id);
282   if (not TRACE_smpi_view_internals()) {
283     TRACE_smpi_recv(src_traced, my_proc_id, 0);
284   }
285
286   log_timed_action (action, clock);
287 }
288
289 static void action_Irecv(const char *const *action)
290 {
291   CHECK_ACTION_PARAMS(action, 2, 1)
292   int from = atoi(action[2]);
293   double size=parse_double(action[3]);
294   double clock = smpi_process()->simulated_elapsed();
295
296   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
297
298   int my_proc_id = Actor::self()->getPid();
299   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
300                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
301   MPI_Status status;
302   //unknow size from the receiver pov
303   if (size <= 0.0) {
304     Request::probe(from, 0, MPI_COMM_WORLD, &status);
305     size = status.count;
306   }
307
308   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
309
310   TRACE_smpi_comm_out(my_proc_id);
311   get_reqq_self()->push_back(request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_test(const char* const* action)
317 {
318   CHECK_ACTION_PARAMS(action, 0, 0)
319   double clock = smpi_process()->simulated_elapsed();
320   MPI_Status status;
321
322   MPI_Request request = get_reqq_self()->back();
323   get_reqq_self()->pop_back();
324   //if request is null here, this may mean that a previous test has succeeded
325   //Different times in traced application and replayed version may lead to this
326   //In this case, ignore the extra calls.
327   if(request!=nullptr){
328     int my_proc_id = Actor::self()->getPid();
329     TRACE_smpi_testing_in(my_proc_id);
330
331     int flag = Request::test(&request, &status);
332
333     XBT_DEBUG("MPI_Test result: %d", flag);
334     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
335     get_reqq_self()->push_back(request);
336
337     TRACE_smpi_testing_out(my_proc_id);
338   }
339   log_timed_action (action, clock);
340 }
341
342 static void action_wait(const char *const *action){
343   CHECK_ACTION_PARAMS(action, 0, 0)
344   double clock = smpi_process()->simulated_elapsed();
345   MPI_Status status;
346
347   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
348       xbt_str_join_array(action," "));
349   MPI_Request request = get_reqq_self()->back();
350   get_reqq_self()->pop_back();
351
352   if (request==nullptr){
353     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
354     return;
355   }
356
357   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
358
359   MPI_Group group = request->comm()->group();
360   int src_traced = group->rank(request->src());
361   int dst_traced = group->rank(request->dst());
362   int is_wait_for_receive = (request->flags() & RECV);
363   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
364
365   Request::wait(&request, &status);
366
367   TRACE_smpi_comm_out(rank);
368   if (is_wait_for_receive)
369     TRACE_smpi_recv(src_traced, dst_traced, 0);
370   log_timed_action (action, clock);
371 }
372
373 static void action_waitall(const char *const *action){
374   CHECK_ACTION_PARAMS(action, 0, 0)
375   double clock = smpi_process()->simulated_elapsed();
376   const unsigned int count_requests = get_reqq_self()->size();
377
378   if (count_requests>0) {
379     MPI_Status status[count_requests];
380
381     int my_proc_id_traced = Actor::self()->getPid();
382     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
383                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
384     int recvs_snd[count_requests];
385     int recvs_rcv[count_requests];
386     for (unsigned int i = 0; i < count_requests; i++) {
387       const auto& req = (*get_reqq_self())[i];
388       if (req && (req->flags() & RECV)) {
389         recvs_snd[i] = req->src();
390         recvs_rcv[i] = req->dst();
391       } else
392         recvs_snd[i] = -100;
393    }
394    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
395
396    for (unsigned i = 0; i < count_requests; i++) {
397      if (recvs_snd[i]!=-100)
398        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
399    }
400    TRACE_smpi_comm_out(my_proc_id_traced);
401   }
402   log_timed_action (action, clock);
403 }
404
405 static void action_barrier(const char *const *action){
406   double clock = smpi_process()->simulated_elapsed();
407   int my_proc_id = Actor::self()->getPid();
408   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
409
410   Colls::barrier(MPI_COMM_WORLD);
411
412   TRACE_smpi_comm_out(my_proc_id);
413   log_timed_action (action, clock);
414 }
415
416 static void action_bcast(const char *const *action)
417 {
418   CHECK_ACTION_PARAMS(action, 1, 2)
419   double size = parse_double(action[2]);
420   double clock = smpi_process()->simulated_elapsed();
421   int root     = (action[3]) ? atoi(action[3]) : 0;
422   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
423   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
424
425   int my_proc_id = Actor::self()->getPid();
426   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
427                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
428                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
429
430   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
431
432   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
433
434   TRACE_smpi_comm_out(my_proc_id);
435   log_timed_action (action, clock);
436 }
437
438 static void action_reduce(const char *const *action)
439 {
440   CHECK_ACTION_PARAMS(action, 2, 2)
441   double comm_size = parse_double(action[2]);
442   double comp_size = parse_double(action[3]);
443   double clock = smpi_process()->simulated_elapsed();
444   int root         = (action[4]) ? atoi(action[4]) : 0;
445
446   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
447
448   int my_proc_id = Actor::self()->getPid();
449   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
450                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
451                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
452
453   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
454   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
456   smpi_execute_flops(comp_size);
457
458   TRACE_smpi_comm_out(my_proc_id);
459   log_timed_action (action, clock);
460 }
461
462 static void action_allReduce(const char *const *action) {
463   CHECK_ACTION_PARAMS(action, 2, 1)
464   double comm_size = parse_double(action[2]);
465   double comp_size = parse_double(action[3]);
466
467   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
468
469   double clock = smpi_process()->simulated_elapsed();
470   int my_proc_id = Actor::self()->getPid();
471   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
472                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
473
474   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
475   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
477   smpi_execute_flops(comp_size);
478
479   TRACE_smpi_comm_out(my_proc_id);
480   log_timed_action (action, clock);
481 }
482
483 static void action_allToAll(const char *const *action) {
484   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
485   double clock = smpi_process()->simulated_elapsed();
486   int comm_size = MPI_COMM_WORLD->size();
487   int send_size = parse_double(action[2]);
488   int recv_size = parse_double(action[3]);
489   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
490   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
491
492   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
493   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
494
495   int my_proc_id = Actor::self()->getPid();
496   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
497                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
498                                                     encode_datatype(MPI_CURRENT_TYPE),
499                                                     encode_datatype(MPI_CURRENT_TYPE2)));
500
501   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
502
503   TRACE_smpi_comm_out(my_proc_id);
504   log_timed_action (action, clock);
505 }
506
507 static void action_gather(const char *const *action) {
508   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
509         0 gather 68 68 0 0 0
510       where:
511         1) 68 is the sendcounts
512         2) 68 is the recvcounts
513         3) 0 is the root node
514         4) 0 is the send datatype id, see decode_datatype()
515         5) 0 is the recv datatype id, see decode_datatype()
516   */
517   CHECK_ACTION_PARAMS(action, 2, 3)
518   double clock = smpi_process()->simulated_elapsed();
519   int comm_size = MPI_COMM_WORLD->size();
520   int send_size = parse_double(action[2]);
521   int recv_size = parse_double(action[3]);
522   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
523   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
524
525   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
526   void *recv = nullptr;
527   int root   = (action[4]) ? atoi(action[4]) : 0;
528   int rank = MPI_COMM_WORLD->rank();
529
530   if(rank==root)
531     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
532
533   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
534                                                                         encode_datatype(MPI_CURRENT_TYPE),
535                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
536
537   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
538
539   TRACE_smpi_comm_out(Actor::self()->getPid());
540   log_timed_action (action, clock);
541 }
542
543 static void action_scatter(const char* const* action)
544 {
545   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
546         0 gather 68 68 0 0 0
547       where:
548         1) 68 is the sendcounts
549         2) 68 is the recvcounts
550         3) 0 is the root node
551         4) 0 is the send datatype id, see decode_datatype()
552         5) 0 is the recv datatype id, see decode_datatype()
553   */
554   CHECK_ACTION_PARAMS(action, 2, 3)
555   double clock                   = smpi_process()->simulated_elapsed();
556   int comm_size                  = MPI_COMM_WORLD->size();
557   int send_size                  = parse_double(action[2]);
558   int recv_size                  = parse_double(action[3]);
559   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
560   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
561
562   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
563   void* recv = nullptr;
564   int root   = (action[4]) ? atoi(action[4]) : 0;
565   int rank = MPI_COMM_WORLD->rank();
566
567   if (rank == root)
568     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
569
570   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
571                                                                         encode_datatype(MPI_CURRENT_TYPE),
572                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
573
574   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
575
576   TRACE_smpi_comm_out(Actor::self()->getPid());
577   log_timed_action(action, clock);
578 }
579
580 static void action_gatherv(const char *const *action) {
581   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
582        0 gather 68 68 10 10 10 0 0 0
583      where:
584        1) 68 is the sendcount
585        2) 68 10 10 10 is the recvcounts
586        3) 0 is the root node
587        4) 0 is the send datatype id, see decode_datatype()
588        5) 0 is the recv datatype id, see decode_datatype()
589   */
590   double clock = smpi_process()->simulated_elapsed();
591   int comm_size = MPI_COMM_WORLD->size();
592   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
593   int send_size = parse_double(action[2]);
594   int disps[comm_size];
595   int recvcounts[comm_size];
596   int recv_sum=0;
597
598   MPI_CURRENT_TYPE =
599       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
600   MPI_Datatype MPI_CURRENT_TYPE2{
601       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
602
603   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
604   void *recv = nullptr;
605   for(int i=0;i<comm_size;i++) {
606     recvcounts[i] = atoi(action[i+3]);
607     recv_sum=recv_sum+recvcounts[i];
608     disps[i]=0;
609   }
610
611   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612   int rank = MPI_COMM_WORLD->rank();
613
614   if(rank==root)
615     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
616
617   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
618
619   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
620                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
621                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
622
623   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
624
625   TRACE_smpi_comm_out(Actor::self()->getPid());
626   log_timed_action (action, clock);
627 }
628
629 static void action_scatterv(const char* const* action)
630 {
631   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
632        0 gather 68 10 10 10 68 0 0 0
633      where:
634        1) 68 10 10 10 is the sendcounts
635        2) 68 is the recvcount
636        3) 0 is the root node
637        4) 0 is the send datatype id, see decode_datatype()
638        5) 0 is the recv datatype id, see decode_datatype()
639   */
640   double clock  = smpi_process()->simulated_elapsed();
641   int comm_size = MPI_COMM_WORLD->size();
642   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
643   int recv_size = parse_double(action[2 + comm_size]);
644   int disps[comm_size];
645   int sendcounts[comm_size];
646   int send_sum = 0;
647
648   MPI_CURRENT_TYPE =
649       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
650   MPI_Datatype MPI_CURRENT_TYPE2{
651       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
652
653   void* send = nullptr;
654   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
655   for (int i = 0; i < comm_size; i++) {
656     sendcounts[i] = atoi(action[i + 2]);
657     send_sum += sendcounts[i];
658     disps[i] = 0;
659   }
660
661   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662   int rank = MPI_COMM_WORLD->rank();
663
664   if (rank == root)
665     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
666
667   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
668
669   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
670                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
671                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
672
673   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
674
675   TRACE_smpi_comm_out(Actor::self()->getPid());
676   log_timed_action(action, clock);
677 }
678
679 static void action_reducescatter(const char *const *action) {
680  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
681       0 reduceScatter 275427 275427 275427 204020 11346849 0
682     where:
683       1) The first four values after the name of the action declare the recvcounts array
684       2) The value 11346849 is the amount of instructions
685       3) The last value corresponds to the datatype, see decode_datatype().
686 */
687   double clock = smpi_process()->simulated_elapsed();
688   int comm_size = MPI_COMM_WORLD->size();
689   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
690   int comp_size = parse_double(action[2+comm_size]);
691   int recvcounts[comm_size];
692   int my_proc_id                     = Actor::self()->getPid();
693   int size = 0;
694   std::vector<int>* trace_recvcounts = new std::vector<int>;
695   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
696
697   for(int i=0;i<comm_size;i++) {
698     recvcounts[i] = atoi(action[i+2]);
699     trace_recvcounts->push_back(recvcounts[i]);
700     size+=recvcounts[i];
701   }
702
703   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
704                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
705                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
706                                                        encode_datatype(MPI_CURRENT_TYPE)));
707
708   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
709   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
710
711   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
712   smpi_execute_flops(comp_size);
713
714   TRACE_smpi_comm_out(my_proc_id);
715   log_timed_action (action, clock);
716 }
717
718 static void action_allgather(const char *const *action) {
719   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
720         0 allGather 275427 275427
721     where:
722         1) 275427 is the sendcount
723         2) 275427 is the recvcount
724         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
725   */
726   double clock = smpi_process()->simulated_elapsed();
727
728   CHECK_ACTION_PARAMS(action, 2, 2)
729   int sendcount=atoi(action[2]);
730   int recvcount=atoi(action[3]);
731
732   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
733   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
734
735   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
736   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
737
738   int my_proc_id = Actor::self()->getPid();
739
740   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
741                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
742                                                     encode_datatype(MPI_CURRENT_TYPE),
743                                                     encode_datatype(MPI_CURRENT_TYPE2)));
744
745   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
746
747   TRACE_smpi_comm_out(my_proc_id);
748   log_timed_action (action, clock);
749 }
750
751 static void action_allgatherv(const char *const *action) {
752   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
753         0 allGatherV 275427 275427 275427 275427 204020
754      where:
755         1) 275427 is the sendcount
756         2) The next four elements declare the recvcounts array
757         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
758   */
759   double clock = smpi_process()->simulated_elapsed();
760
761   int comm_size = MPI_COMM_WORLD->size();
762   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
763   int sendcount=atoi(action[2]);
764   int recvcounts[comm_size];
765   int disps[comm_size];
766   int recv_sum=0;
767
768   MPI_CURRENT_TYPE =
769       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
770   MPI_Datatype MPI_CURRENT_TYPE2{
771       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
772
773   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
774
775   for(int i=0;i<comm_size;i++) {
776     recvcounts[i] = atoi(action[i+3]);
777     recv_sum=recv_sum+recvcounts[i];
778     disps[i] = 0;
779   }
780   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
781
782   int my_proc_id = Actor::self()->getPid();
783
784   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
785
786   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
787                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
788                                                        encode_datatype(MPI_CURRENT_TYPE),
789                                                        encode_datatype(MPI_CURRENT_TYPE2)));
790
791   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
792                           MPI_COMM_WORLD);
793
794   TRACE_smpi_comm_out(my_proc_id);
795   log_timed_action (action, clock);
796 }
797
798 static void action_allToAllv(const char *const *action) {
799   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
800         0 allToAllV 100 1 7 10 12 100 1 70 10 5
801      where:
802         1) 100 is the size of the send buffer *sizeof(int),
803         2) 1 7 10 12 is the sendcounts array
804         3) 100*sizeof(int) is the size of the receiver buffer
805         4)  1 70 10 5 is the recvcounts array
806   */
807   double clock = smpi_process()->simulated_elapsed();
808
809   int comm_size = MPI_COMM_WORLD->size();
810   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
811   int send_size = 0;
812   int recv_size = 0;
813   int sendcounts[comm_size];
814   std::vector<int>* trace_sendcounts = new std::vector<int>;
815   int recvcounts[comm_size];
816   std::vector<int>* trace_recvcounts = new std::vector<int>;
817   int senddisps[comm_size];
818   int recvdisps[comm_size];
819
820   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
821                          ? decode_datatype(action[4 + 2 * comm_size])
822                          : MPI_DEFAULT_TYPE;
823   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
824                                      ? decode_datatype(action[5 + 2 * comm_size])
825                                      : MPI_DEFAULT_TYPE};
826
827   int send_buf_size=parse_double(action[2]);
828   int recv_buf_size=parse_double(action[3+comm_size]);
829   int my_proc_id = Actor::self()->getPid();
830   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
831   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
832
833   for(int i=0;i<comm_size;i++) {
834     sendcounts[i] = atoi(action[i+3]);
835     trace_sendcounts->push_back(sendcounts[i]);
836     send_size += sendcounts[i];
837     recvcounts[i] = atoi(action[i+4+comm_size]);
838     trace_recvcounts->push_back(recvcounts[i]);
839     recv_size += recvcounts[i];
840     senddisps[i] = 0;
841     recvdisps[i] = 0;
842   }
843
844   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
845                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
846                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
847                                                        encode_datatype(MPI_CURRENT_TYPE2)));
848
849   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
850                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
851
852   TRACE_smpi_comm_out(my_proc_id);
853   log_timed_action (action, clock);
854 }
855
856 }} // namespace simgrid::smpi
857
858 /** @brief Only initialize the replay, don't do it for real */
859 void smpi_replay_init(int* argc, char*** argv)
860 {
861   simgrid::smpi::Process::init(argc, argv);
862   smpi_process()->mark_as_initialized();
863   smpi_process()->set_replaying(true);
864
865   int my_proc_id = Actor::self()->getPid();
866   TRACE_smpi_init(my_proc_id);
867   TRACE_smpi_computing_init(my_proc_id);
868   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
869   TRACE_smpi_comm_out(my_proc_id);
870   xbt_replay_action_register("init",       simgrid::smpi::action_init);
871   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
872   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
873   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
874   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
875   xbt_replay_action_register("send",       simgrid::smpi::action_send);
876   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
877   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
878   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
879   xbt_replay_action_register("test",       simgrid::smpi::action_test);
880   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
881   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
882   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
883   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
884   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
885   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
886   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
887   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
888   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
889   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
890   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
891   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
892   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
893   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
894   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
895   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
896
897   //if we have a delayed start, sleep here.
898   if(*argc>2){
899     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
900     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
901     smpi_execute_flops(value);
902   } else {
903     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
904     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
905     smpi_execute_flops(0.0);
906   }
907 }
908
909 /** @brief actually run the replay after initialization */
910 void smpi_replay_main(int* argc, char*** argv)
911 {
912   simgrid::xbt::replay_runner(*argc, *argv);
913
914   /* and now, finalize everything */
915   /* One active process will stop. Decrease the counter*/
916   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
917   if (not get_reqq_self()->empty()) {
918     unsigned int count_requests=get_reqq_self()->size();
919     MPI_Request requests[count_requests];
920     MPI_Status status[count_requests];
921     unsigned int i=0;
922
923     for (auto const& req : *get_reqq_self()) {
924       requests[i] = req;
925       i++;
926     }
927     simgrid::smpi::Request::waitall(count_requests, requests, status);
928   }
929   delete get_reqq_self();
930   active_processes--;
931
932   if(active_processes==0){
933     /* Last process alive speaking: end the simulated timer */
934     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
935     xbt_free(sendbuffer);
936     xbt_free(recvbuffer);
937   }
938
939   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
940
941   smpi_process()->finalize();
942
943   TRACE_smpi_comm_out(Actor::self()->getPid());
944   TRACE_smpi_finalize(Actor::self()->getPid());
945 }
946
947 /** @brief chain a replay initialization and a replay start */
948 void smpi_replay_run(int* argc, char*** argv)
949 {
950   smpi_replay_init(argc, argv);
951   smpi_replay_main(argc, argv);
952 }