Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
c59a810d60dbfb48cbf752dfc23154a08013e402
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
93 {
94   switch(atoi(action)) {
95     case 0:
96       MPI_CURRENT_TYPE=MPI_DOUBLE;
97       break;
98     case 1:
99       MPI_CURRENT_TYPE=MPI_INT;
100       break;
101     case 2:
102       MPI_CURRENT_TYPE=MPI_CHAR;
103       break;
104     case 3:
105       MPI_CURRENT_TYPE=MPI_SHORT;
106       break;
107     case 4:
108       MPI_CURRENT_TYPE=MPI_LONG;
109       break;
110     case 5:
111       MPI_CURRENT_TYPE=MPI_FLOAT;
112       break;
113     case 6:
114       MPI_CURRENT_TYPE=MPI_BYTE;
115       break;
116     default:
117       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118       break;
119   }
120    return MPI_CURRENT_TYPE;
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int rank = smpi_process()->index();
202
203   TRACE_smpi_computing_in(rank, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(rank);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   if(action[4])
218     MPI_CURRENT_TYPE=decode_datatype(action[4]);
219   else
220     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
221
222   int rank = smpi_process()->index();
223   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
224
225   TRACE_smpi_comm_in(rank, __FUNCTION__,
226                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
227   if (not TRACE_smpi_view_internals())
228     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
229
230   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
231
232   TRACE_smpi_comm_out(rank);
233
234   log_timed_action(action, clock);
235 }
236
237 static void action_Isend(const char *const *action)
238 {
239   CHECK_ACTION_PARAMS(action, 2, 1)
240   int to = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process()->simulated_elapsed();
243
244   if(action[4])
245     MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else
247     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
248
249   int rank = smpi_process()->index();
250   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
251   TRACE_smpi_comm_in(rank, __FUNCTION__,
252                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
253   if (not TRACE_smpi_view_internals())
254     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
255
256   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
257
258   TRACE_smpi_comm_out(rank);
259
260   get_reqq_self()->push_back(request);
261
262   log_timed_action (action, clock);
263 }
264
265 static void action_recv(const char *const *action) {
266   CHECK_ACTION_PARAMS(action, 2, 1)
267   int from = atoi(action[2]);
268   double size=parse_double(action[3]);
269   double clock = smpi_process()->simulated_elapsed();
270   MPI_Status status;
271
272   if(action[4])
273     MPI_CURRENT_TYPE=decode_datatype(action[4]);
274   else
275     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
276
277   int rank = smpi_process()->index();
278   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
279
280   TRACE_smpi_comm_in(rank, __FUNCTION__,
281                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
282
283   //unknown size from the receiver point of view
284   if (size <= 0.0) {
285     Request::probe(from, 0, MPI_COMM_WORLD, &status);
286     size=status.count;
287   }
288
289   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
290
291   TRACE_smpi_comm_out(rank);
292   if (not TRACE_smpi_view_internals()) {
293     TRACE_smpi_recv(src_traced, rank, 0);
294   }
295
296   log_timed_action (action, clock);
297 }
298
299 static void action_Irecv(const char *const *action)
300 {
301   CHECK_ACTION_PARAMS(action, 2, 1)
302   int from = atoi(action[2]);
303   double size=parse_double(action[3]);
304   double clock = smpi_process()->simulated_elapsed();
305
306   if(action[4])
307     MPI_CURRENT_TYPE=decode_datatype(action[4]);
308   else
309     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
310
311   int rank = smpi_process()->index();
312   TRACE_smpi_comm_in(rank, __FUNCTION__,
313                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
314   MPI_Status status;
315   //unknow size from the receiver pov
316   if (size <= 0.0) {
317     Request::probe(from, 0, MPI_COMM_WORLD, &status);
318     size = status.count;
319   }
320
321   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
322
323   TRACE_smpi_comm_out(rank);
324   get_reqq_self()->push_back(request);
325
326   log_timed_action (action, clock);
327 }
328
329 static void action_test(const char* const* action)
330 {
331   CHECK_ACTION_PARAMS(action, 0, 0)
332   double clock = smpi_process()->simulated_elapsed();
333   MPI_Status status;
334
335   MPI_Request request = get_reqq_self()->back();
336   get_reqq_self()->pop_back();
337   //if request is null here, this may mean that a previous test has succeeded
338   //Different times in traced application and replayed version may lead to this
339   //In this case, ignore the extra calls.
340   if(request!=nullptr){
341     int rank = smpi_process()->index();
342     TRACE_smpi_testing_in(rank);
343
344     int flag = Request::test(&request, &status);
345
346     XBT_DEBUG("MPI_Test result: %d", flag);
347     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
348     get_reqq_self()->push_back(request);
349
350     TRACE_smpi_testing_out(rank);
351   }
352   log_timed_action (action, clock);
353 }
354
355 static void action_wait(const char *const *action){
356   CHECK_ACTION_PARAMS(action, 0, 0)
357   double clock = smpi_process()->simulated_elapsed();
358   MPI_Status status;
359
360   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
361       xbt_str_join_array(action," "));
362   MPI_Request request = get_reqq_self()->back();
363   get_reqq_self()->pop_back();
364
365   if (request==nullptr){
366     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
367     return;
368   }
369
370   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
371
372   MPI_Group group = request->comm()->group();
373   int src_traced = group->rank(request->src());
374   int dst_traced = group->rank(request->dst());
375   int is_wait_for_receive = (request->flags() & RECV);
376   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
377
378   Request::wait(&request, &status);
379
380   TRACE_smpi_comm_out(rank);
381   if (is_wait_for_receive)
382     TRACE_smpi_recv(src_traced, dst_traced, 0);
383   log_timed_action (action, clock);
384 }
385
386 static void action_waitall(const char *const *action){
387   CHECK_ACTION_PARAMS(action, 0, 0)
388   double clock = smpi_process()->simulated_elapsed();
389   const unsigned int count_requests = get_reqq_self()->size();
390
391   if (count_requests>0) {
392     MPI_Status status[count_requests];
393
394    int rank_traced = smpi_process()->index();
395    TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
396    int recvs_snd[count_requests];
397    int recvs_rcv[count_requests];
398    for (unsigned int i = 0; i < count_requests; i++) {
399      const auto& req = (*get_reqq_self())[i];
400      if (req && (req->flags () & RECV)){
401        recvs_snd[i]=req->src();
402        recvs_rcv[i]=req->dst();
403      }else
404        recvs_snd[i]=-100;
405    }
406    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
407
408    for (unsigned i = 0; i < count_requests; i++) {
409      if (recvs_snd[i]!=-100)
410        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
411    }
412    TRACE_smpi_comm_out(rank_traced);
413   }
414   log_timed_action (action, clock);
415 }
416
417 static void action_barrier(const char *const *action){
418   double clock = smpi_process()->simulated_elapsed();
419   int rank = smpi_process()->index();
420   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
421
422   Colls::barrier(MPI_COMM_WORLD);
423
424   TRACE_smpi_comm_out(rank);
425   log_timed_action (action, clock);
426 }
427
428 static void action_bcast(const char *const *action)
429 {
430   CHECK_ACTION_PARAMS(action, 1, 2)
431   double size = parse_double(action[2]);
432   double clock = smpi_process()->simulated_elapsed();
433   int root=0;
434   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
435   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
436
437   if(action[3]) {
438     root= atoi(action[3]);
439     if(action[4])
440       MPI_CURRENT_TYPE=decode_datatype(action[4]);
441   }
442
443   int rank = smpi_process()->index();
444   TRACE_smpi_comm_in(rank, __FUNCTION__,
445                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size, -1,
446                                                     encode_datatype(MPI_CURRENT_TYPE), ""));
447
448   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
449
450   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
451
452   TRACE_smpi_comm_out(rank);
453   log_timed_action (action, clock);
454 }
455
456 static void action_reduce(const char *const *action)
457 {
458   CHECK_ACTION_PARAMS(action, 2, 2)
459   double comm_size = parse_double(action[2]);
460   double comp_size = parse_double(action[3]);
461   double clock = smpi_process()->simulated_elapsed();
462   int root=0;
463   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
464
465   if(action[4]) {
466     root= atoi(action[4]);
467     if(action[5])
468       MPI_CURRENT_TYPE=decode_datatype(action[5]);
469   }
470
471   int rank = smpi_process()->index();
472   TRACE_smpi_comm_in(rank, __FUNCTION__,
473                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
474                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
475
476   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
479   smpi_execute_flops(comp_size);
480
481   TRACE_smpi_comm_out(rank);
482   log_timed_action (action, clock);
483 }
484
485 static void action_allReduce(const char *const *action) {
486   CHECK_ACTION_PARAMS(action, 2, 1)
487   double comm_size = parse_double(action[2]);
488   double comp_size = parse_double(action[3]);
489
490   if(action[4])
491     MPI_CURRENT_TYPE=decode_datatype(action[4]);
492   else
493     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
494
495   double clock = smpi_process()->simulated_elapsed();
496   int rank = smpi_process()->index();
497   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
498                                                                         encode_datatype(MPI_CURRENT_TYPE), ""));
499
500   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
501   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
502   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
503   smpi_execute_flops(comp_size);
504
505   TRACE_smpi_comm_out(rank);
506   log_timed_action (action, clock);
507 }
508
509 static void action_allToAll(const char *const *action) {
510   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
511   double clock = smpi_process()->simulated_elapsed();
512   int comm_size = MPI_COMM_WORLD->size();
513   int send_size = parse_double(action[2]);
514   int recv_size = parse_double(action[3]);
515   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
516
517   if(action[4] && action[5]) {
518     MPI_CURRENT_TYPE=decode_datatype(action[4]);
519     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
520   }
521   else
522     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
523
524   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
525   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
526
527   int rank = smpi_process()->index();
528   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
529                                                                         encode_datatype(MPI_CURRENT_TYPE),
530                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
531
532   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
533
534   TRACE_smpi_comm_out(rank);
535   log_timed_action (action, clock);
536 }
537
538 static void action_gather(const char *const *action) {
539   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
540         0 gather 68 68 0 0 0
541       where:
542         1) 68 is the sendcounts
543         2) 68 is the recvcounts
544         3) 0 is the root node
545         4) 0 is the send datatype id, see decode_datatype()
546         5) 0 is the recv datatype id, see decode_datatype()
547   */
548   CHECK_ACTION_PARAMS(action, 2, 3)
549   double clock = smpi_process()->simulated_elapsed();
550   int comm_size = MPI_COMM_WORLD->size();
551   int send_size = parse_double(action[2]);
552   int recv_size = parse_double(action[3]);
553   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
554   if(action[4] && action[5]) {
555     MPI_CURRENT_TYPE=decode_datatype(action[5]);
556     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
557   } else {
558     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
559   }
560   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
561   void *recv = nullptr;
562   int root=0;
563   if(action[4])
564     root=atoi(action[4]);
565   int rank = MPI_COMM_WORLD->rank();
566
567   if(rank==root)
568     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
569
570   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
571                                                                         encode_datatype(MPI_CURRENT_TYPE),
572                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
573
574   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
575
576   TRACE_smpi_comm_out(smpi_process()->index());
577   log_timed_action (action, clock);
578 }
579
580 static void action_scatter(const char* const* action)
581 {
582   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
583         0 gather 68 68 0 0 0
584       where:
585         1) 68 is the sendcounts
586         2) 68 is the recvcounts
587         3) 0 is the root node
588         4) 0 is the send datatype id, see decode_datatype()
589         5) 0 is the recv datatype id, see decode_datatype()
590   */
591   CHECK_ACTION_PARAMS(action, 2, 3)
592   double clock                   = smpi_process()->simulated_elapsed();
593   int comm_size                  = MPI_COMM_WORLD->size();
594   int send_size                  = parse_double(action[2]);
595   int recv_size                  = parse_double(action[3]);
596   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
597   if (action[4] && action[5]) {
598     MPI_CURRENT_TYPE  = decode_datatype(action[5]);
599     MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
600   } else {
601     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
602   }
603   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
604   void* recv = nullptr;
605   int root   = 0;
606   if (action[4])
607     root   = atoi(action[4]);
608   int rank = MPI_COMM_WORLD->rank();
609
610   if (rank == root)
611     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
612
613   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
614                                                                         encode_datatype(MPI_CURRENT_TYPE),
615                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
616
617   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
618
619   TRACE_smpi_comm_out(smpi_process()->index());
620   log_timed_action(action, clock);
621 }
622
623 static void action_gatherv(const char *const *action) {
624   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
625        0 gather 68 68 10 10 10 0 0 0
626      where:
627        1) 68 is the sendcount
628        2) 68 10 10 10 is the recvcounts
629        3) 0 is the root node
630        4) 0 is the send datatype id, see decode_datatype()
631        5) 0 is the recv datatype id, see decode_datatype()
632   */
633   double clock = smpi_process()->simulated_elapsed();
634   int comm_size = MPI_COMM_WORLD->size();
635   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
636   int send_size = parse_double(action[2]);
637   int disps[comm_size];
638   int recvcounts[comm_size];
639   int recv_sum=0;
640
641   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
642   if(action[4+comm_size] && action[5+comm_size]) {
643     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
644     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
645   } else
646     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
647
648   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
649   void *recv = nullptr;
650   for(int i=0;i<comm_size;i++) {
651     recvcounts[i] = atoi(action[i+3]);
652     recv_sum=recv_sum+recvcounts[i];
653     disps[i]=0;
654   }
655
656   int root=atoi(action[3+comm_size]);
657   int rank = MPI_COMM_WORLD->rank();
658
659   if(rank==root)
660     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
661
662   std::vector<int>* trace_recvcounts = new std::vector<int>;
663   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
664     trace_recvcounts->push_back(recvcounts[i]);
665
666   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
667                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
668                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
669
670   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
671
672   TRACE_smpi_comm_out(smpi_process()->index());
673   log_timed_action (action, clock);
674 }
675
676 static void action_scatterv(const char* const* action)
677 {
678   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
679        0 gather 68 10 10 10 68 0 0 0
680      where:
681        1) 68 10 10 10 is the sendcounts
682        2) 68 is the recvcount
683        3) 0 is the root node
684        4) 0 is the send datatype id, see decode_datatype()
685        5) 0 is the recv datatype id, see decode_datatype()
686   */
687   double clock  = smpi_process()->simulated_elapsed();
688   int comm_size = MPI_COMM_WORLD->size();
689   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
690   int recv_size = parse_double(action[2 + comm_size]);
691   int disps[comm_size];
692   int sendcounts[comm_size];
693   int send_sum = 0;
694
695   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
696   if (action[4 + comm_size] && action[5 + comm_size]) {
697     MPI_CURRENT_TYPE  = decode_datatype(action[4 + comm_size]);
698     MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
699   } else
700     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
701
702   void* send = nullptr;
703   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
704   for (int i = 0; i < comm_size; i++) {
705     sendcounts[i] = atoi(action[i + 2]);
706     send_sum += sendcounts[i];
707     disps[i] = 0;
708   }
709
710   int root = atoi(action[3 + comm_size]);
711   int rank = MPI_COMM_WORLD->rank();
712
713   if (rank == root)
714     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
715
716   std::vector<int>* trace_sendcounts = new std::vector<int>;
717   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
718     trace_sendcounts->push_back(sendcounts[i]);
719
720   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
721                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
722                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
723
724   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
725
726   TRACE_smpi_comm_out(smpi_process()->index());
727   log_timed_action(action, clock);
728 }
729
730 static void action_reducescatter(const char *const *action) {
731  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
732       0 reduceScatter 275427 275427 275427 204020 11346849 0
733     where:
734       1) The first four values after the name of the action declare the recvcounts array
735       2) The value 11346849 is the amount of instructions
736       3) The last value corresponds to the datatype, see decode_datatype().
737 */
738   double clock = smpi_process()->simulated_elapsed();
739   int comm_size = MPI_COMM_WORLD->size();
740   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
741   int comp_size = parse_double(action[2+comm_size]);
742   int recvcounts[comm_size];
743   int rank = smpi_process()->index();
744   int size = 0;
745   std::vector<int>* trace_recvcounts = new std::vector<int>;
746   if(action[3+comm_size])
747     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
748   else
749     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
750
751   for(int i=0;i<comm_size;i++) {
752     recvcounts[i] = atoi(action[i+2]);
753     trace_recvcounts->push_back(recvcounts[i]);
754     size+=recvcounts[i];
755   }
756
757   TRACE_smpi_comm_in(rank, __FUNCTION__,
758                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
759                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
760                                                        encode_datatype(MPI_CURRENT_TYPE)));
761
762   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
763   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
764
765   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
766   smpi_execute_flops(comp_size);
767
768   TRACE_smpi_comm_out(rank);
769   log_timed_action (action, clock);
770 }
771
772 static void action_allgather(const char *const *action) {
773   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
774         0 allGather 275427 275427
775     where:
776         1) 275427 is the sendcount
777         2) 275427 is the recvcount
778         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
779   */
780   double clock = smpi_process()->simulated_elapsed();
781
782   CHECK_ACTION_PARAMS(action, 2, 2)
783   int sendcount=atoi(action[2]);
784   int recvcount=atoi(action[3]);
785
786   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
787
788   if(action[4] && action[5]) {
789     MPI_CURRENT_TYPE = decode_datatype(action[4]);
790     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
791   } else
792     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
793
794   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
795   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
796
797   int rank = smpi_process()->index();
798
799   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
800                                                                         encode_datatype(MPI_CURRENT_TYPE),
801                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
802
803   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
804
805   TRACE_smpi_comm_out(rank);
806   log_timed_action (action, clock);
807 }
808
809 static void action_allgatherv(const char *const *action) {
810   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
811         0 allGatherV 275427 275427 275427 275427 204020
812      where:
813         1) 275427 is the sendcount
814         2) The next four elements declare the recvcounts array
815         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
816   */
817   double clock = smpi_process()->simulated_elapsed();
818
819   int comm_size = MPI_COMM_WORLD->size();
820   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
821   int sendcount=atoi(action[2]);
822   int recvcounts[comm_size];
823   int disps[comm_size];
824   int recv_sum=0;
825   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
826
827   if(action[3+comm_size] && action[4+comm_size]) {
828     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
829     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
830   } else
831     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
832
833   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
834
835   for(int i=0;i<comm_size;i++) {
836     recvcounts[i] = atoi(action[i+3]);
837     recv_sum=recv_sum+recvcounts[i];
838     disps[i] = 0;
839   }
840   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
841
842   int rank = smpi_process()->index();
843
844   std::vector<int>* trace_recvcounts = new std::vector<int>;
845   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
846     trace_recvcounts->push_back(recvcounts[i]);
847
848   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
849                                              "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
850                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
851
852   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
853                           MPI_COMM_WORLD);
854
855   TRACE_smpi_comm_out(rank);
856   log_timed_action (action, clock);
857 }
858
859 static void action_allToAllv(const char *const *action) {
860   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
861         0 allToAllV 100 1 7 10 12 100 1 70 10 5
862      where:
863         1) 100 is the size of the send buffer *sizeof(int),
864         2) 1 7 10 12 is the sendcounts array
865         3) 100*sizeof(int) is the size of the receiver buffer
866         4)  1 70 10 5 is the recvcounts array
867   */
868   double clock = smpi_process()->simulated_elapsed();
869
870   int comm_size = MPI_COMM_WORLD->size();
871   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
872   int send_size = 0;
873   int recv_size = 0;
874   int sendcounts[comm_size];
875   std::vector<int>* trace_sendcounts = new std::vector<int>;
876   int recvcounts[comm_size];
877   std::vector<int>* trace_recvcounts = new std::vector<int>;
878   int senddisps[comm_size];
879   int recvdisps[comm_size];
880
881   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
882
883   int send_buf_size=parse_double(action[2]);
884   int recv_buf_size=parse_double(action[3+comm_size]);
885   if(action[4+2*comm_size] && action[5+2*comm_size]) {
886     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
887     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
888   }
889   else
890     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
891
892   int rank       = smpi_process()->index();
893   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
894   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
895
896   for(int i=0;i<comm_size;i++) {
897     sendcounts[i] = atoi(action[i+3]);
898     trace_sendcounts->push_back(sendcounts[i]);
899     send_size += sendcounts[i];
900     recvcounts[i] = atoi(action[i+4+comm_size]);
901     trace_recvcounts->push_back(recvcounts[i]);
902     recv_size += recvcounts[i];
903     senddisps[i] = 0;
904     recvdisps[i] = 0;
905   }
906
907   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
908                                              "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
909                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
910
911   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
912                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
913
914   TRACE_smpi_comm_out(rank);
915   log_timed_action (action, clock);
916 }
917
918 }} // namespace simgrid::smpi
919
920 /** @brief Only initialize the replay, don't do it for real */
921 void smpi_replay_init(int* argc, char*** argv)
922 {
923   simgrid::smpi::Process::init(argc, argv);
924   smpi_process()->mark_as_initialized();
925   smpi_process()->set_replaying(true);
926
927   int rank = smpi_process()->index();
928   TRACE_smpi_init(rank);
929   TRACE_smpi_computing_init(rank);
930   TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
931   TRACE_smpi_comm_out(rank);
932   xbt_replay_action_register("init",       simgrid::smpi::action_init);
933   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
934   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
935   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
936   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
937   xbt_replay_action_register("send",       simgrid::smpi::action_send);
938   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
939   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
940   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
941   xbt_replay_action_register("test",       simgrid::smpi::action_test);
942   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
943   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
944   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
945   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
946   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
947   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
948   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
949   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
950   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
951   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
952   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
953   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
954   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
955   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
956   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
957   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
958
959   //if we have a delayed start, sleep here.
960   if(*argc>2){
961     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
962     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
963     smpi_execute_flops(value);
964   } else {
965     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
966     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
967     smpi_execute_flops(0.0);
968   }
969 }
970
971 /** @brief actually run the replay after initialization */
972 void smpi_replay_main(int* argc, char*** argv)
973 {
974   simgrid::xbt::replay_runner(*argc, *argv);
975
976   /* and now, finalize everything */
977   /* One active process will stop. Decrease the counter*/
978   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
979   if (not get_reqq_self()->empty()) {
980     unsigned int count_requests=get_reqq_self()->size();
981     MPI_Request requests[count_requests];
982     MPI_Status status[count_requests];
983     unsigned int i=0;
984
985     for (auto const& req : *get_reqq_self()) {
986       requests[i] = req;
987       i++;
988     }
989     simgrid::smpi::Request::waitall(count_requests, requests, status);
990   }
991   delete get_reqq_self();
992   active_processes--;
993
994   if(active_processes==0){
995     /* Last process alive speaking: end the simulated timer */
996     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
997     xbt_free(sendbuffer);
998     xbt_free(recvbuffer);
999   }
1000
1001   TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1002
1003   smpi_process()->finalize();
1004
1005   TRACE_smpi_comm_out(smpi_process()->index());
1006   TRACE_smpi_finalize(smpi_process()->index());
1007 }
1008
1009 /** @brief chain a replay initialization and a replay start */
1010 void smpi_replay_run(int* argc, char*** argv)
1011 {
1012   smpi_replay_init(argc, argv);
1013   smpi_replay_main(argc, argv);
1014 }