Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into actor-yield
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90 static MPI_Datatype decode_datatype(const char *const action)
91 {
92   switch(atoi(action)) {
93     case 0:
94       MPI_CURRENT_TYPE=MPI_DOUBLE;
95       break;
96     case 1:
97       MPI_CURRENT_TYPE=MPI_INT;
98       break;
99     case 2:
100       MPI_CURRENT_TYPE=MPI_CHAR;
101       break;
102     case 3:
103       MPI_CURRENT_TYPE=MPI_SHORT;
104       break;
105     case 4:
106       MPI_CURRENT_TYPE=MPI_LONG;
107       break;
108     case 5:
109       MPI_CURRENT_TYPE=MPI_FLOAT;
110       break;
111     case 6:
112       MPI_CURRENT_TYPE=MPI_BYTE;
113       break;
114     default:
115       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116       break;
117   }
118    return MPI_CURRENT_TYPE;
119 }
120
121 const char* encode_datatype(MPI_Datatype datatype)
122 {
123   if (datatype==MPI_BYTE)
124       return "";
125   if(datatype==MPI_DOUBLE)
126       return "0";
127   if(datatype==MPI_INT)
128       return "1";
129   if(datatype==MPI_CHAR)
130       return "2";
131   if(datatype==MPI_SHORT)
132       return "3";
133   if(datatype==MPI_LONG)
134     return "4";
135   if(datatype==MPI_FLOAT)
136       return "5";
137   // default - not implemented.
138   // do not warn here as we pass in this function even for other trace formats
139   return "-1";
140 }
141
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143     int i=0;\
144     while(action[i]!=nullptr)\
145      i++;\
146     if(i<mandatory+2)                                           \
147     THROWF(arg_error, 0, "%s replay failed.\n" \
148           "%d items were given on the line. First two should be process_id and action.  " \
149           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
151   }
152
153 namespace simgrid {
154 namespace smpi {
155
156 static void action_init(const char *const *action)
157 {
158   XBT_DEBUG("Initialize the counters");
159   CHECK_ACTION_PARAMS(action, 0, 1)
160   if(action[2])
161     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
162   else
163     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
164
165   /* start a simulated timer */
166   smpi_process()->simulated_start();
167   /*initialize the number of active processes */
168   active_processes = smpi_process_count();
169
170   set_reqq_self(new std::vector<MPI_Request>);
171 }
172
173 static void action_finalize(const char *const *action)
174 {
175   /* Nothing to do */
176 }
177
178 static void action_comm_size(const char *const *action)
179 {
180   communicator_size = parse_double(action[2]);
181   log_timed_action (action, smpi_process()->simulated_elapsed());
182 }
183
184 static void action_comm_split(const char *const *action)
185 {
186   log_timed_action (action, smpi_process()->simulated_elapsed());
187 }
188
189 static void action_comm_dup(const char *const *action)
190 {
191   log_timed_action (action, smpi_process()->simulated_elapsed());
192 }
193
194 static void action_compute(const char *const *action)
195 {
196   CHECK_ACTION_PARAMS(action, 1, 0)
197   double clock = smpi_process()->simulated_elapsed();
198   double flops= parse_double(action[2]);
199   int rank = smpi_process()->index();
200
201   TRACE_smpi_computing_in(rank, flops);
202   smpi_execute_flops(flops);
203   TRACE_smpi_computing_out(rank);
204
205   log_timed_action (action, clock);
206 }
207
208 static void action_send(const char *const *action)
209 {
210   CHECK_ACTION_PARAMS(action, 2, 1)
211   int to = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process()->simulated_elapsed();
214
215   if(action[4])
216     MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else
218     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
219
220   int rank = smpi_process()->index();
221   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
222
223   TRACE_smpi_comm_in(rank, __FUNCTION__,
224                      new simgrid::instr::Pt2PtTIData("send", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
225   if (not TRACE_smpi_view_internals())
226     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
227
228   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
229
230   TRACE_smpi_comm_out(rank);
231
232   log_timed_action(action, clock);
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   CHECK_ACTION_PARAMS(action, 2, 1)
238   int to = atoi(action[2]);
239   double size=parse_double(action[3]);
240   double clock = smpi_process()->simulated_elapsed();
241
242   if(action[4])
243     MPI_CURRENT_TYPE=decode_datatype(action[4]);
244   else
245     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
246
247   int rank = smpi_process()->index();
248   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
249   TRACE_smpi_comm_in(rank, __FUNCTION__,
250                      new simgrid::instr::Pt2PtTIData("Isend", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
251   if (not TRACE_smpi_view_internals())
252     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
253
254   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
255
256   TRACE_smpi_comm_out(rank);
257
258   get_reqq_self()->push_back(request);
259
260   log_timed_action (action, clock);
261 }
262
263 static void action_recv(const char *const *action) {
264   CHECK_ACTION_PARAMS(action, 2, 1)
265   int from = atoi(action[2]);
266   double size=parse_double(action[3]);
267   double clock = smpi_process()->simulated_elapsed();
268   MPI_Status status;
269
270   if(action[4])
271     MPI_CURRENT_TYPE=decode_datatype(action[4]);
272   else
273     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
274
275   int rank = smpi_process()->index();
276   int src_traced = MPI_COMM_WORLD->group()->rank(from);
277
278   TRACE_smpi_comm_in(rank, __FUNCTION__,
279                      new simgrid::instr::Pt2PtTIData("recv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
280
281   //unknown size from the receiver point of view
282   if (size <= 0.0) {
283     Request::probe(from, 0, MPI_COMM_WORLD, &status);
284     size=status.count;
285   }
286
287   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
288
289   TRACE_smpi_comm_out(rank);
290   if (not TRACE_smpi_view_internals()) {
291     TRACE_smpi_recv(src_traced, rank, 0);
292   }
293
294   log_timed_action (action, clock);
295 }
296
297 static void action_Irecv(const char *const *action)
298 {
299   CHECK_ACTION_PARAMS(action, 2, 1)
300   int from = atoi(action[2]);
301   double size=parse_double(action[3]);
302   double clock = smpi_process()->simulated_elapsed();
303
304   if(action[4])
305     MPI_CURRENT_TYPE=decode_datatype(action[4]);
306   else
307     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
308
309   int rank = smpi_process()->index();
310   int src_traced = MPI_COMM_WORLD->group()->rank(from);
311   TRACE_smpi_comm_in(rank, __FUNCTION__,
312                      new simgrid::instr::Pt2PtTIData("Irecv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
313   MPI_Status status;
314   //unknow size from the receiver pov
315   if (size <= 0.0) {
316     Request::probe(from, 0, MPI_COMM_WORLD, &status);
317     size = status.count;
318   }
319
320   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
321
322   TRACE_smpi_comm_out(rank);
323   get_reqq_self()->push_back(request);
324
325   log_timed_action (action, clock);
326 }
327
328 static void action_test(const char* const* action)
329 {
330   CHECK_ACTION_PARAMS(action, 0, 0)
331   double clock = smpi_process()->simulated_elapsed();
332   MPI_Status status;
333
334   MPI_Request request = get_reqq_self()->back();
335   get_reqq_self()->pop_back();
336   //if request is null here, this may mean that a previous test has succeeded
337   //Different times in traced application and replayed version may lead to this
338   //In this case, ignore the extra calls.
339   if(request!=nullptr){
340     int rank = smpi_process()->index();
341     TRACE_smpi_testing_in(rank);
342
343     int flag = Request::test(&request, &status);
344
345     XBT_DEBUG("MPI_Test result: %d", flag);
346     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
347     get_reqq_self()->push_back(request);
348
349     TRACE_smpi_testing_out(rank);
350   }
351   log_timed_action (action, clock);
352 }
353
354 static void action_wait(const char *const *action){
355   CHECK_ACTION_PARAMS(action, 0, 0)
356   double clock = smpi_process()->simulated_elapsed();
357   MPI_Status status;
358
359   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
360       xbt_str_join_array(action," "));
361   MPI_Request request = get_reqq_self()->back();
362   get_reqq_self()->pop_back();
363
364   if (request==nullptr){
365     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
366     return;
367   }
368
369   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
370
371   MPI_Group group = request->comm()->group();
372   int src_traced = group->rank(request->src());
373   int dst_traced = group->rank(request->dst());
374   int is_wait_for_receive = (request->flags() & RECV);
375   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
376
377   Request::wait(&request, &status);
378
379   TRACE_smpi_comm_out(rank);
380   if (is_wait_for_receive)
381     TRACE_smpi_recv(src_traced, dst_traced, 0);
382   log_timed_action (action, clock);
383 }
384
385 static void action_waitall(const char *const *action){
386   CHECK_ACTION_PARAMS(action, 0, 0)
387   double clock = smpi_process()->simulated_elapsed();
388   const unsigned int count_requests = get_reqq_self()->size();
389
390   if (count_requests>0) {
391     MPI_Status status[count_requests];
392
393    int rank_traced = smpi_process()->index();
394    TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
395    int recvs_snd[count_requests];
396    int recvs_rcv[count_requests];
397    for (unsigned int i = 0; i < count_requests; i++) {
398      const auto& req = (*get_reqq_self())[i];
399      if (req && (req->flags () & RECV)){
400        recvs_snd[i]=req->src();
401        recvs_rcv[i]=req->dst();
402      }else
403        recvs_snd[i]=-100;
404    }
405    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
406
407    for (unsigned i = 0; i < count_requests; i++) {
408      if (recvs_snd[i]!=-100)
409        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
410    }
411    TRACE_smpi_comm_out(rank_traced);
412   }
413   log_timed_action (action, clock);
414 }
415
416 static void action_barrier(const char *const *action){
417   double clock = smpi_process()->simulated_elapsed();
418   int rank = smpi_process()->index();
419   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
420
421   Colls::barrier(MPI_COMM_WORLD);
422
423   TRACE_smpi_comm_out(rank);
424   log_timed_action (action, clock);
425 }
426
427 static void action_bcast(const char *const *action)
428 {
429   CHECK_ACTION_PARAMS(action, 1, 2)
430   double size = parse_double(action[2]);
431   double clock = smpi_process()->simulated_elapsed();
432   int root=0;
433   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
434   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
435
436   if(action[3]) {
437     root= atoi(action[3]);
438     if(action[4])
439       MPI_CURRENT_TYPE=decode_datatype(action[4]);
440   }
441
442   int rank = smpi_process()->index();
443   TRACE_smpi_comm_in(rank, __FUNCTION__,
444                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->index(root), -1.0, size, -1,
445                                                     encode_datatype(MPI_CURRENT_TYPE), ""));
446
447   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
448
449   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
450
451   TRACE_smpi_comm_out(rank);
452   log_timed_action (action, clock);
453 }
454
455 static void action_reduce(const char *const *action)
456 {
457   CHECK_ACTION_PARAMS(action, 2, 2)
458   double comm_size = parse_double(action[2]);
459   double comp_size = parse_double(action[3]);
460   double clock = smpi_process()->simulated_elapsed();
461   int root=0;
462   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
463
464   if(action[4]) {
465     root= atoi(action[4]);
466     if(action[5])
467       MPI_CURRENT_TYPE=decode_datatype(action[5]);
468   }
469
470   int rank = smpi_process()->index();
471   TRACE_smpi_comm_in(rank, __FUNCTION__,
472                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->index(root), comp_size,
473                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
474
475   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
478   smpi_execute_flops(comp_size);
479
480   TRACE_smpi_comm_out(rank);
481   log_timed_action (action, clock);
482 }
483
484 static void action_allReduce(const char *const *action) {
485   CHECK_ACTION_PARAMS(action, 2, 1)
486   double comm_size = parse_double(action[2]);
487   double comp_size = parse_double(action[3]);
488
489   if(action[4])
490     MPI_CURRENT_TYPE=decode_datatype(action[4]);
491   else
492     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
493
494   double clock = smpi_process()->simulated_elapsed();
495   int rank = smpi_process()->index();
496   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
497                                                                         encode_datatype(MPI_CURRENT_TYPE), ""));
498
499   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
500   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
501   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
502   smpi_execute_flops(comp_size);
503
504   TRACE_smpi_comm_out(rank);
505   log_timed_action (action, clock);
506 }
507
508 static void action_allToAll(const char *const *action) {
509   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
510   double clock = smpi_process()->simulated_elapsed();
511   int comm_size = MPI_COMM_WORLD->size();
512   int send_size = parse_double(action[2]);
513   int recv_size = parse_double(action[3]);
514   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
515
516   if(action[4] && action[5]) {
517     MPI_CURRENT_TYPE=decode_datatype(action[4]);
518     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
519   }
520   else
521     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
522
523   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
524   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
525
526   int rank = smpi_process()->index();
527   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
528                                                                         encode_datatype(MPI_CURRENT_TYPE),
529                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
530
531   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
532
533   TRACE_smpi_comm_out(rank);
534   log_timed_action (action, clock);
535 }
536
537 static void action_gather(const char *const *action) {
538   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
539         0 gather 68 68 0 0 0
540       where:
541         1) 68 is the sendcounts
542         2) 68 is the recvcounts
543         3) 0 is the root node
544         4) 0 is the send datatype id, see decode_datatype()
545         5) 0 is the recv datatype id, see decode_datatype()
546   */
547   CHECK_ACTION_PARAMS(action, 2, 3)
548   double clock = smpi_process()->simulated_elapsed();
549   int comm_size = MPI_COMM_WORLD->size();
550   int send_size = parse_double(action[2]);
551   int recv_size = parse_double(action[3]);
552   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
553   if(action[4] && action[5]) {
554     MPI_CURRENT_TYPE=decode_datatype(action[5]);
555     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
556   } else {
557     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
558   }
559   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
560   void *recv = nullptr;
561   int root=0;
562   if(action[4])
563     root=atoi(action[4]);
564   int rank = MPI_COMM_WORLD->rank();
565
566   if(rank==root)
567     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
568
569   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
570                                                                         encode_datatype(MPI_CURRENT_TYPE),
571                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
572
573   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
574
575   TRACE_smpi_comm_out(smpi_process()->index());
576   log_timed_action (action, clock);
577 }
578
579 static void action_scatter(const char* const* action)
580 {
581   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
582         0 gather 68 68 0 0 0
583       where:
584         1) 68 is the sendcounts
585         2) 68 is the recvcounts
586         3) 0 is the root node
587         4) 0 is the send datatype id, see decode_datatype()
588         5) 0 is the recv datatype id, see decode_datatype()
589   */
590   CHECK_ACTION_PARAMS(action, 2, 3)
591   double clock                   = smpi_process()->simulated_elapsed();
592   int comm_size                  = MPI_COMM_WORLD->size();
593   int send_size                  = parse_double(action[2]);
594   int recv_size                  = parse_double(action[3]);
595   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
596   if (action[4] && action[5]) {
597     MPI_CURRENT_TYPE  = decode_datatype(action[5]);
598     MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
599   } else {
600     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
601   }
602   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
603   void* recv = nullptr;
604   int root   = 0;
605   if (action[4])
606     root   = atoi(action[4]);
607   int rank = MPI_COMM_WORLD->rank();
608
609   if (rank == root)
610     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
611
612   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
613                                                                         encode_datatype(MPI_CURRENT_TYPE),
614                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
615
616   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
617
618   TRACE_smpi_comm_out(smpi_process()->index());
619   log_timed_action(action, clock);
620 }
621
622 static void action_gatherv(const char *const *action) {
623   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
624        0 gather 68 68 10 10 10 0 0 0
625      where:
626        1) 68 is the sendcount
627        2) 68 10 10 10 is the recvcounts
628        3) 0 is the root node
629        4) 0 is the send datatype id, see decode_datatype()
630        5) 0 is the recv datatype id, see decode_datatype()
631   */
632   double clock = smpi_process()->simulated_elapsed();
633   int comm_size = MPI_COMM_WORLD->size();
634   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
635   int send_size = parse_double(action[2]);
636   int disps[comm_size];
637   int recvcounts[comm_size];
638   int recv_sum=0;
639
640   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
641   if(action[4+comm_size] && action[5+comm_size]) {
642     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
643     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
644   } else
645     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
646
647   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
648   void *recv = nullptr;
649   for(int i=0;i<comm_size;i++) {
650     recvcounts[i] = atoi(action[i+3]);
651     recv_sum=recv_sum+recvcounts[i];
652     disps[i]=0;
653   }
654
655   int root=atoi(action[3+comm_size]);
656   int rank = MPI_COMM_WORLD->rank();
657
658   if(rank==root)
659     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
660
661   std::vector<int>* trace_recvcounts = new std::vector<int>;
662   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
663     trace_recvcounts->push_back(recvcounts[i]);
664
665   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
666                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
667                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
668
669   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
670
671   TRACE_smpi_comm_out(smpi_process()->index());
672   log_timed_action (action, clock);
673 }
674
675 static void action_scatterv(const char* const* action)
676 {
677   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
678        0 gather 68 10 10 10 68 0 0 0
679      where:
680        1) 68 10 10 10 is the sendcounts
681        2) 68 is the recvcount
682        3) 0 is the root node
683        4) 0 is the send datatype id, see decode_datatype()
684        5) 0 is the recv datatype id, see decode_datatype()
685   */
686   double clock  = smpi_process()->simulated_elapsed();
687   int comm_size = MPI_COMM_WORLD->size();
688   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
689   int recv_size = parse_double(action[2 + comm_size]);
690   int disps[comm_size];
691   int sendcounts[comm_size];
692   int send_sum = 0;
693
694   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
695   if (action[4 + comm_size] && action[5 + comm_size]) {
696     MPI_CURRENT_TYPE  = decode_datatype(action[4 + comm_size]);
697     MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
698   } else
699     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
700
701   void* send = nullptr;
702   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
703   for (int i = 0; i < comm_size; i++) {
704     sendcounts[i] = atoi(action[i + 2]);
705     send_sum += sendcounts[i];
706     disps[i] = 0;
707   }
708
709   int root = atoi(action[3 + comm_size]);
710   int rank = MPI_COMM_WORLD->rank();
711
712   if (rank == root)
713     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
714
715   std::vector<int>* trace_sendcounts = new std::vector<int>;
716   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
717     trace_sendcounts->push_back(sendcounts[i]);
718
719   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
720                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
721                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
722
723   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
724
725   TRACE_smpi_comm_out(smpi_process()->index());
726   log_timed_action(action, clock);
727 }
728
729 static void action_reducescatter(const char *const *action) {
730  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
731       0 reduceScatter 275427 275427 275427 204020 11346849 0
732     where:
733       1) The first four values after the name of the action declare the recvcounts array
734       2) The value 11346849 is the amount of instructions
735       3) The last value corresponds to the datatype, see decode_datatype().
736 */
737   double clock = smpi_process()->simulated_elapsed();
738   int comm_size = MPI_COMM_WORLD->size();
739   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
740   int comp_size = parse_double(action[2+comm_size]);
741   int recvcounts[comm_size];
742   int rank = smpi_process()->index();
743   int size = 0;
744   std::vector<int>* trace_recvcounts = new std::vector<int>;
745   if(action[3+comm_size])
746     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
747   else
748     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
749
750   for(int i=0;i<comm_size;i++) {
751     recvcounts[i] = atoi(action[i+2]);
752     trace_recvcounts->push_back(recvcounts[i]);
753     size+=recvcounts[i];
754   }
755
756   TRACE_smpi_comm_in(rank, __FUNCTION__,
757                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
758                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
759                                                        encode_datatype(MPI_CURRENT_TYPE)));
760
761   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
762   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
763
764   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
765   smpi_execute_flops(comp_size);
766
767   TRACE_smpi_comm_out(rank);
768   log_timed_action (action, clock);
769 }
770
771 static void action_allgather(const char *const *action) {
772   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
773         0 allGather 275427 275427
774     where:
775         1) 275427 is the sendcount
776         2) 275427 is the recvcount
777         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
778   */
779   double clock = smpi_process()->simulated_elapsed();
780
781   CHECK_ACTION_PARAMS(action, 2, 2)
782   int sendcount=atoi(action[2]);
783   int recvcount=atoi(action[3]);
784
785   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
786
787   if(action[4] && action[5]) {
788     MPI_CURRENT_TYPE = decode_datatype(action[4]);
789     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
790   } else
791     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
792
793   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
794   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
795
796   int rank = smpi_process()->index();
797
798   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
799                                                                         encode_datatype(MPI_CURRENT_TYPE),
800                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
801
802   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
803
804   TRACE_smpi_comm_out(rank);
805   log_timed_action (action, clock);
806 }
807
808 static void action_allgatherv(const char *const *action) {
809   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
810         0 allGatherV 275427 275427 275427 275427 204020
811      where:
812         1) 275427 is the sendcount
813         2) The next four elements declare the recvcounts array
814         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
815   */
816   double clock = smpi_process()->simulated_elapsed();
817
818   int comm_size = MPI_COMM_WORLD->size();
819   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
820   int sendcount=atoi(action[2]);
821   int recvcounts[comm_size];
822   int disps[comm_size];
823   int recv_sum=0;
824   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
825
826   if(action[3+comm_size] && action[4+comm_size]) {
827     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
828     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
829   } else
830     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
831
832   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
833
834   for(int i=0;i<comm_size;i++) {
835     recvcounts[i] = atoi(action[i+3]);
836     recv_sum=recv_sum+recvcounts[i];
837     disps[i] = 0;
838   }
839   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
840
841   int rank = smpi_process()->index();
842
843   std::vector<int>* trace_recvcounts = new std::vector<int>;
844   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
845     trace_recvcounts->push_back(recvcounts[i]);
846
847   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
848                                              "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
849                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
850
851   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
852                           MPI_COMM_WORLD);
853
854   TRACE_smpi_comm_out(rank);
855   log_timed_action (action, clock);
856 }
857
858 static void action_allToAllv(const char *const *action) {
859   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
860         0 allToAllV 100 1 7 10 12 100 1 70 10 5
861      where:
862         1) 100 is the size of the send buffer *sizeof(int),
863         2) 1 7 10 12 is the sendcounts array
864         3) 100*sizeof(int) is the size of the receiver buffer
865         4)  1 70 10 5 is the recvcounts array
866   */
867   double clock = smpi_process()->simulated_elapsed();
868
869   int comm_size = MPI_COMM_WORLD->size();
870   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
871   int send_size = 0;
872   int recv_size = 0;
873   int sendcounts[comm_size];
874   std::vector<int>* trace_sendcounts = new std::vector<int>;
875   int recvcounts[comm_size];
876   std::vector<int>* trace_recvcounts = new std::vector<int>;
877   int senddisps[comm_size];
878   int recvdisps[comm_size];
879
880   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
881
882   int send_buf_size=parse_double(action[2]);
883   int recv_buf_size=parse_double(action[3+comm_size]);
884   if(action[4+2*comm_size] && action[5+2*comm_size]) {
885     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
886     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
887   }
888   else
889     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
890
891   int rank       = smpi_process()->index();
892   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
893   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
894
895   for(int i=0;i<comm_size;i++) {
896     sendcounts[i] = atoi(action[i+3]);
897     trace_sendcounts->push_back(sendcounts[i]);
898     send_size += sendcounts[i];
899     recvcounts[i] = atoi(action[i+4+comm_size]);
900     trace_recvcounts->push_back(recvcounts[i]);
901     recv_size += recvcounts[i];
902     senddisps[i] = 0;
903     recvdisps[i] = 0;
904   }
905
906   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
907                                              "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
908                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
909
910   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
911                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
912
913   TRACE_smpi_comm_out(rank);
914   log_timed_action (action, clock);
915 }
916
917 }} // namespace simgrid::smpi
918
919 /** @brief Only initialize the replay, don't do it for real */
920 void smpi_replay_init(int* argc, char*** argv)
921 {
922   simgrid::smpi::Process::init(argc, argv);
923   smpi_process()->mark_as_initialized();
924   smpi_process()->set_replaying(true);
925
926   int rank = smpi_process()->index();
927   TRACE_smpi_init(rank);
928   TRACE_smpi_computing_init(rank);
929   TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
930   TRACE_smpi_comm_out(rank);
931   xbt_replay_action_register("init",       simgrid::smpi::action_init);
932   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
933   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
934   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
935   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
936   xbt_replay_action_register("send",       simgrid::smpi::action_send);
937   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
938   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
939   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
940   xbt_replay_action_register("test",       simgrid::smpi::action_test);
941   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
942   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
943   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
944   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
945   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
946   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
947   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
948   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
949   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
950   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
951   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
952   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
953   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
954   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
955   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
956   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
957
958   //if we have a delayed start, sleep here.
959   if(*argc>2){
960     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
961     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
962     smpi_execute_flops(value);
963   } else {
964     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
965     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
966     smpi_execute_flops(0.0);
967   }
968 }
969
970 /** @brief actually run the replay after initialization */
971 void smpi_replay_main(int* argc, char*** argv)
972 {
973   simgrid::xbt::replay_runner(*argc, *argv);
974
975   /* and now, finalize everything */
976   /* One active process will stop. Decrease the counter*/
977   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
978   if (not get_reqq_self()->empty()) {
979     unsigned int count_requests=get_reqq_self()->size();
980     MPI_Request requests[count_requests];
981     MPI_Status status[count_requests];
982     unsigned int i=0;
983
984     for (auto const& req : *get_reqq_self()) {
985       requests[i] = req;
986       i++;
987     }
988     simgrid::smpi::Request::waitall(count_requests, requests, status);
989   }
990   delete get_reqq_self();
991   active_processes--;
992
993   if(active_processes==0){
994     /* Last process alive speaking: end the simulated timer */
995     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
996     xbt_free(sendbuffer);
997     xbt_free(recvbuffer);
998   }
999
1000   TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1001
1002   smpi_process()->finalize();
1003
1004   TRACE_smpi_comm_out(smpi_process()->index());
1005   TRACE_smpi_finalize(smpi_process()->index());
1006 }
1007
1008 /** @brief chain a replay initialization and a replay start */
1009 void smpi_replay_run(int* argc, char*** argv)
1010 {
1011   smpi_replay_init(argc, argv);
1012   smpi_replay_main(argc, argv);
1013 }