Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
63774890f39659c8cc1deab72f8f5bac33a24bd2
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90 static MPI_Datatype decode_datatype(const char *const action)
91 {
92   switch(atoi(action)) {
93     case 0:
94       MPI_CURRENT_TYPE=MPI_DOUBLE;
95       break;
96     case 1:
97       MPI_CURRENT_TYPE=MPI_INT;
98       break;
99     case 2:
100       MPI_CURRENT_TYPE=MPI_CHAR;
101       break;
102     case 3:
103       MPI_CURRENT_TYPE=MPI_SHORT;
104       break;
105     case 4:
106       MPI_CURRENT_TYPE=MPI_LONG;
107       break;
108     case 5:
109       MPI_CURRENT_TYPE=MPI_FLOAT;
110       break;
111     case 6:
112       MPI_CURRENT_TYPE=MPI_BYTE;
113       break;
114     default:
115       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116       break;
117   }
118    return MPI_CURRENT_TYPE;
119 }
120
121 const char* encode_datatype(MPI_Datatype datatype)
122 {
123   if (datatype==MPI_BYTE)
124       return "";
125   if(datatype==MPI_DOUBLE)
126       return "0";
127   if(datatype==MPI_INT)
128       return "1";
129   if(datatype==MPI_CHAR)
130       return "2";
131   if(datatype==MPI_SHORT)
132       return "3";
133   if(datatype==MPI_LONG)
134     return "4";
135   if(datatype==MPI_FLOAT)
136       return "5";
137   // default - not implemented.
138   // do not warn here as we pass in this function even for other trace formats
139   return "-1";
140 }
141
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143     int i=0;\
144     while(action[i]!=nullptr)\
145      i++;\
146     if(i<mandatory+2)                                           \
147     THROWF(arg_error, 0, "%s replay failed.\n" \
148           "%d items were given on the line. First two should be process_id and action.  " \
149           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
151   }
152
153 namespace simgrid {
154 namespace smpi {
155
156 static void action_init(const char *const *action)
157 {
158   XBT_DEBUG("Initialize the counters");
159   CHECK_ACTION_PARAMS(action, 0, 1)
160   if(action[2])
161     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
162   else
163     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
164
165   /* start a simulated timer */
166   smpi_process()->simulated_start();
167   /*initialize the number of active processes */
168   active_processes = smpi_process_count();
169
170   set_reqq_self(new std::vector<MPI_Request>);
171 }
172
173 static void action_finalize(const char *const *action)
174 {
175   /* Nothing to do */
176 }
177
178 static void action_comm_size(const char *const *action)
179 {
180   communicator_size = parse_double(action[2]);
181   log_timed_action (action, smpi_process()->simulated_elapsed());
182 }
183
184 static void action_comm_split(const char *const *action)
185 {
186   log_timed_action (action, smpi_process()->simulated_elapsed());
187 }
188
189 static void action_comm_dup(const char *const *action)
190 {
191   log_timed_action (action, smpi_process()->simulated_elapsed());
192 }
193
194 static void action_compute(const char *const *action)
195 {
196   CHECK_ACTION_PARAMS(action, 1, 0)
197   double clock = smpi_process()->simulated_elapsed();
198   double flops= parse_double(action[2]);
199   int rank = smpi_process()->index();
200   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
201   extra->type=TRACING_COMPUTING;
202   extra->comp_size=flops;
203   TRACE_smpi_computing_in(rank, extra);
204
205   smpi_execute_flops(flops);
206
207   TRACE_smpi_computing_out(rank);
208   log_timed_action (action, clock);
209 }
210
211 static void action_send(const char *const *action)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1)
214   int to = atoi(action[2]);
215   double size=parse_double(action[3]);
216   double clock = smpi_process()->simulated_elapsed();
217
218   if(action[4])
219     MPI_CURRENT_TYPE=decode_datatype(action[4]);
220   else
221     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
222
223   int rank = smpi_process()->index();
224
225   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
226   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
227   extra->type = TRACING_SEND;
228   extra->send_size = size;
229   extra->src = rank;
230   extra->dst = dst_traced;
231   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
232   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
233   if (not TRACE_smpi_view_internals())
234     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
235
236   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
237
238   log_timed_action (action, clock);
239
240   TRACE_smpi_comm_out(rank);
241 }
242
243 static void action_Isend(const char *const *action)
244 {
245   CHECK_ACTION_PARAMS(action, 2, 1)
246   int to = atoi(action[2]);
247   double size=parse_double(action[3]);
248   double clock = smpi_process()->simulated_elapsed();
249
250   if(action[4])
251     MPI_CURRENT_TYPE=decode_datatype(action[4]);
252   else
253     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
254
255   int rank = smpi_process()->index();
256   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
257   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
258   extra->type = TRACING_ISEND;
259   extra->send_size = size;
260   extra->src = rank;
261   extra->dst = dst_traced;
262   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
263   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
264   if (not TRACE_smpi_view_internals())
265     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
266
267   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
268
269   TRACE_smpi_comm_out(rank);
270
271   get_reqq_self()->push_back(request);
272
273   log_timed_action (action, clock);
274 }
275
276 static void action_recv(const char *const *action) {
277   CHECK_ACTION_PARAMS(action, 2, 1)
278   int from = atoi(action[2]);
279   double size=parse_double(action[3]);
280   double clock = smpi_process()->simulated_elapsed();
281   MPI_Status status;
282
283   if(action[4])
284     MPI_CURRENT_TYPE=decode_datatype(action[4]);
285   else
286     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
287
288   int rank = smpi_process()->index();
289   int src_traced = MPI_COMM_WORLD->group()->rank(from);
290
291   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
292   extra->type = TRACING_RECV;
293   extra->send_size = size;
294   extra->src = src_traced;
295   extra->dst = rank;
296   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
297   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
298
299   //unknown size from the receiver point of view
300   if(size<=0.0){
301     Request::probe(from, 0, MPI_COMM_WORLD, &status);
302     size=status.count;
303   }
304
305   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
306
307   TRACE_smpi_comm_out(rank);
308   if (not TRACE_smpi_view_internals()) {
309     TRACE_smpi_recv(src_traced, rank, 0);
310   }
311
312   log_timed_action (action, clock);
313 }
314
315 static void action_Irecv(const char *const *action)
316 {
317   CHECK_ACTION_PARAMS(action, 2, 1)
318   int from = atoi(action[2]);
319   double size=parse_double(action[3]);
320   double clock = smpi_process()->simulated_elapsed();
321
322   if(action[4])
323     MPI_CURRENT_TYPE=decode_datatype(action[4]);
324   else
325     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
326
327   int rank = smpi_process()->index();
328   int src_traced = MPI_COMM_WORLD->group()->rank(from);
329   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
330   extra->type = TRACING_IRECV;
331   extra->send_size = size;
332   extra->src = src_traced;
333   extra->dst = rank;
334   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
335   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
336   MPI_Status status;
337   //unknow size from the receiver pov
338   if(size<=0.0){
339       Request::probe(from, 0, MPI_COMM_WORLD, &status);
340       size=status.count;
341   }
342
343   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
344
345   TRACE_smpi_comm_out(rank);
346   get_reqq_self()->push_back(request);
347
348   log_timed_action (action, clock);
349 }
350
351 static void action_test(const char *const *action){
352   CHECK_ACTION_PARAMS(action, 0, 0)
353   double clock = smpi_process()->simulated_elapsed();
354   MPI_Status status;
355
356   MPI_Request request = get_reqq_self()->back();
357   get_reqq_self()->pop_back();
358   //if request is null here, this may mean that a previous test has succeeded
359   //Different times in traced application and replayed version may lead to this
360   //In this case, ignore the extra calls.
361   if(request!=nullptr){
362     int rank = smpi_process()->index();
363     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
364     extra->type=TRACING_TEST;
365     TRACE_smpi_testing_in(rank, extra);
366
367     int flag = Request::test(&request, &status);
368
369     XBT_DEBUG("MPI_Test result: %d", flag);
370     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
371     get_reqq_self()->push_back(request);
372
373     TRACE_smpi_testing_out(rank);
374   }
375   log_timed_action (action, clock);
376 }
377
378 static void action_wait(const char *const *action){
379   CHECK_ACTION_PARAMS(action, 0, 0)
380   double clock = smpi_process()->simulated_elapsed();
381   MPI_Status status;
382
383   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
384       xbt_str_join_array(action," "));
385   MPI_Request request = get_reqq_self()->back();
386   get_reqq_self()->pop_back();
387
388   if (request==nullptr){
389     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
390     return;
391   }
392
393   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
394
395   MPI_Group group = request->comm()->group();
396   int src_traced = group->rank(request->src());
397   int dst_traced = group->rank(request->dst());
398   int is_wait_for_receive = (request->flags() & RECV);
399   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
400   extra->type = TRACING_WAIT;
401   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
402
403   Request::wait(&request, &status);
404
405   TRACE_smpi_comm_out(rank);
406   if (is_wait_for_receive)
407     TRACE_smpi_recv(src_traced, dst_traced, 0);
408   log_timed_action (action, clock);
409 }
410
411 static void action_waitall(const char *const *action){
412   CHECK_ACTION_PARAMS(action, 0, 0)
413   double clock = smpi_process()->simulated_elapsed();
414   const unsigned int count_requests = get_reqq_self()->size();
415
416   if (count_requests>0) {
417     MPI_Status status[count_requests];
418
419    int rank_traced = smpi_process()->index();
420    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
421    extra->type = TRACING_WAITALL;
422    extra->send_size=count_requests;
423    TRACE_smpi_comm_in(rank_traced, __FUNCTION__, extra);
424    int recvs_snd[count_requests];
425    int recvs_rcv[count_requests];
426    for (unsigned int i = 0; i < count_requests; i++) {
427      const auto& req = (*get_reqq_self())[i];
428      if (req && (req->flags () & RECV)){
429        recvs_snd[i]=req->src();
430        recvs_rcv[i]=req->dst();
431      }else
432        recvs_snd[i]=-100;
433    }
434    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
435
436    for (unsigned i = 0; i < count_requests; i++) {
437      if (recvs_snd[i]!=-100)
438        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
439    }
440    TRACE_smpi_comm_out(rank_traced);
441   }
442   log_timed_action (action, clock);
443 }
444
445 static void action_barrier(const char *const *action){
446   double clock = smpi_process()->simulated_elapsed();
447   int rank = smpi_process()->index();
448   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449   extra->type = TRACING_BARRIER;
450   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
451
452   Colls::barrier(MPI_COMM_WORLD);
453
454   TRACE_smpi_comm_out(rank);
455   log_timed_action (action, clock);
456 }
457
458 static void action_bcast(const char *const *action)
459 {
460   CHECK_ACTION_PARAMS(action, 1, 2)
461   double size = parse_double(action[2]);
462   double clock = smpi_process()->simulated_elapsed();
463   int root=0;
464   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466
467   if(action[3]) {
468     root= atoi(action[3]);
469     if(action[4])
470       MPI_CURRENT_TYPE=decode_datatype(action[4]);
471   }
472
473   int rank = smpi_process()->index();
474   int root_traced = MPI_COMM_WORLD->group()->index(root);
475
476   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477   extra->type = TRACING_BCAST;
478   extra->send_size = size;
479   extra->root = root_traced;
480   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
481   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
482   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
483
484   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
485
486   TRACE_smpi_comm_out(rank);
487   log_timed_action (action, clock);
488 }
489
490 static void action_reduce(const char *const *action)
491 {
492   CHECK_ACTION_PARAMS(action, 2, 2)
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process()->simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5])
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503   }
504
505   int rank = smpi_process()->index();
506   int root_traced = MPI_COMM_WORLD->group()->rank(root);
507   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508   extra->type = TRACING_REDUCE;
509   extra->send_size = comm_size;
510   extra->comp_size = comp_size;
511   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
512   extra->root = root_traced;
513
514   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
515
516   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
518   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519   smpi_execute_flops(comp_size);
520
521   TRACE_smpi_comm_out(rank);
522   log_timed_action (action, clock);
523 }
524
525 static void action_allReduce(const char *const *action) {
526   CHECK_ACTION_PARAMS(action, 2, 1)
527   double comm_size = parse_double(action[2]);
528   double comp_size = parse_double(action[3]);
529
530   if(action[4])
531     MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else
533     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
534
535   double clock = smpi_process()->simulated_elapsed();
536   int rank = smpi_process()->index();
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
542   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
543
544   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
546   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547   smpi_execute_flops(comp_size);
548
549   TRACE_smpi_comm_out(rank);
550   log_timed_action (action, clock);
551 }
552
553 static void action_allToAll(const char *const *action) {
554   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555   double clock = smpi_process()->simulated_elapsed();
556   int comm_size = MPI_COMM_WORLD->size();
557   int send_size = parse_double(action[2]);
558   int recv_size = parse_double(action[3]);
559   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
560
561   if(action[4] && action[5]) {
562     MPI_CURRENT_TYPE=decode_datatype(action[4]);
563     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
564   }
565   else
566     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
567
568   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
569   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
570
571   int rank = smpi_process()->index();
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_ALLTOALL;
574   extra->send_size = send_size;
575   extra->recv_size = recv_size;
576   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
577   extra->datatype2       = encode_datatype(MPI_CURRENT_TYPE2);
578
579   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
580
581   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
582
583   TRACE_smpi_comm_out(rank);
584   log_timed_action (action, clock);
585 }
586
587 static void action_gather(const char *const *action) {
588   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
589         0 gather 68 68 0 0 0
590       where:
591         1) 68 is the sendcounts
592         2) 68 is the recvcounts
593         3) 0 is the root node
594         4) 0 is the send datatype id, see decode_datatype()
595         5) 0 is the recv datatype id, see decode_datatype()
596   */
597   CHECK_ACTION_PARAMS(action, 2, 3)
598   double clock = smpi_process()->simulated_elapsed();
599   int comm_size = MPI_COMM_WORLD->size();
600   int send_size = parse_double(action[2]);
601   int recv_size = parse_double(action[3]);
602   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603   if(action[4] && action[5]) {
604     MPI_CURRENT_TYPE=decode_datatype(action[5]);
605     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
606   } else {
607     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
608   }
609   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
610   void *recv = nullptr;
611   int root=0;
612   if(action[4])
613     root=atoi(action[4]);
614   int rank = MPI_COMM_WORLD->rank();
615
616   if(rank==root)
617     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
618
619   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620   extra->type = TRACING_GATHER;
621   extra->send_size = send_size;
622   extra->recv_size = recv_size;
623   extra->root = root;
624   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
625   extra->datatype2       = encode_datatype(MPI_CURRENT_TYPE2);
626
627   TRACE_smpi_comm_in(smpi_process()->index(), __FUNCTION__, extra);
628
629   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
630
631   TRACE_smpi_comm_out(smpi_process()->index());
632   log_timed_action (action, clock);
633 }
634
635 static void action_gatherv(const char *const *action) {
636   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637        0 gather 68 68 10 10 10 0 0 0
638      where:
639        1) 68 is the sendcount
640        2) 68 10 10 10 is the recvcounts
641        3) 0 is the root node
642        4) 0 is the send datatype id, see decode_datatype()
643        5) 0 is the recv datatype id, see decode_datatype()
644   */
645   double clock = smpi_process()->simulated_elapsed();
646   int comm_size = MPI_COMM_WORLD->size();
647   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648   int send_size = parse_double(action[2]);
649   int disps[comm_size];
650   int recvcounts[comm_size];
651   int recv_sum=0;
652
653   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654   if(action[4+comm_size] && action[5+comm_size]) {
655     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
657   } else
658     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
659
660   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
661   void *recv = nullptr;
662   for(int i=0;i<comm_size;i++) {
663     recvcounts[i] = atoi(action[i+3]);
664     recv_sum=recv_sum+recvcounts[i];
665     disps[i]=0;
666   }
667
668   int root=atoi(action[3+comm_size]);
669   int rank = MPI_COMM_WORLD->rank();
670
671   if(rank==root)
672     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
673
674   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675   extra->type            = TRACING_GATHERV;
676   extra->send_size       = send_size;
677   extra->recvcounts      = new int[comm_size];
678   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679     extra->recvcounts[i] = recvcounts[i];
680   extra->root = root;
681   extra->num_processes = comm_size;
682   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
683   extra->datatype2       = encode_datatype(MPI_CURRENT_TYPE2);
684
685   TRACE_smpi_comm_in(smpi_process()->index(), __FUNCTION__, extra);
686
687   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
688
689   TRACE_smpi_comm_out(smpi_process()->index());
690   log_timed_action (action, clock);
691 }
692
693 static void action_reducescatter(const char *const *action) {
694  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695       0 reduceScatter 275427 275427 275427 204020 11346849 0
696     where:
697       1) The first four values after the name of the action declare the recvcounts array
698       2) The value 11346849 is the amount of instructions
699       3) The last value corresponds to the datatype, see decode_datatype().
700 */
701   double clock = smpi_process()->simulated_elapsed();
702   int comm_size = MPI_COMM_WORLD->size();
703   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704   int comp_size = parse_double(action[2+comm_size]);
705   int recvcounts[comm_size];
706   int rank = smpi_process()->index();
707   int size = 0;
708   if(action[3+comm_size])
709     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
710   else
711     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
712
713   for(int i=0;i<comm_size;i++) {
714     recvcounts[i] = atoi(action[i+2]);
715     size+=recvcounts[i];
716   }
717
718   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719   extra->type = TRACING_REDUCE_SCATTER;
720   extra->send_size = 0;
721   extra->recvcounts      = new int[comm_size];
722   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723     extra->recvcounts[i] = recvcounts[i];
724   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
725   extra->comp_size = comp_size;
726   extra->num_processes = comm_size;
727
728   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
729
730   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
731   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
732
733   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734   smpi_execute_flops(comp_size);
735
736   TRACE_smpi_comm_out(rank);
737   log_timed_action (action, clock);
738 }
739
740 static void action_allgather(const char *const *action) {
741   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742         0 allGather 275427 275427
743     where:
744         1) 275427 is the sendcount
745         2) 275427 is the recvcount
746         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
747   */
748   double clock = smpi_process()->simulated_elapsed();
749
750   CHECK_ACTION_PARAMS(action, 2, 2)
751   int sendcount=atoi(action[2]);
752   int recvcount=atoi(action[3]);
753
754   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
755
756   if(action[4] && action[5]) {
757     MPI_CURRENT_TYPE = decode_datatype(action[4]);
758     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
759   } else
760     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
761
762   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
763   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
764
765   int rank = smpi_process()->index();
766   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767   extra->type = TRACING_ALLGATHER;
768   extra->send_size = sendcount;
769   extra->recv_size= recvcount;
770   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
771   extra->datatype2       = encode_datatype(MPI_CURRENT_TYPE2);
772   extra->num_processes = MPI_COMM_WORLD->size();
773
774   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
775
776   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
777
778   TRACE_smpi_comm_out(rank);
779   log_timed_action (action, clock);
780 }
781
782 static void action_allgatherv(const char *const *action) {
783   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784         0 allGatherV 275427 275427 275427 275427 204020
785      where:
786         1) 275427 is the sendcount
787         2) The next four elements declare the recvcounts array
788         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
789   */
790   double clock = smpi_process()->simulated_elapsed();
791
792   int comm_size = MPI_COMM_WORLD->size();
793   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794   int sendcount=atoi(action[2]);
795   int recvcounts[comm_size];
796   int disps[comm_size];
797   int recv_sum=0;
798   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
799
800   if(action[3+comm_size] && action[4+comm_size]) {
801     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
803   } else
804     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
805
806   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
807
808   for(int i=0;i<comm_size;i++) {
809     recvcounts[i] = atoi(action[i+3]);
810     recv_sum=recv_sum+recvcounts[i];
811     disps[i] = 0;
812   }
813   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
814
815   int rank = smpi_process()->index();
816   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817   extra->type = TRACING_ALLGATHERV;
818   extra->send_size = sendcount;
819   extra->recvcounts      = new int[comm_size];
820   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821     extra->recvcounts[i] = recvcounts[i];
822   extra->datatype1       = encode_datatype(MPI_CURRENT_TYPE);
823   extra->datatype2       = encode_datatype(MPI_CURRENT_TYPE2);
824   extra->num_processes = comm_size;
825
826   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
827
828   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
829                           MPI_COMM_WORLD);
830
831   TRACE_smpi_comm_out(rank);
832   log_timed_action (action, clock);
833 }
834
835 static void action_allToAllv(const char *const *action) {
836   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837         0 allToAllV 100 1 7 10 12 100 1 70 10 5
838      where:
839         1) 100 is the size of the send buffer *sizeof(int),
840         2) 1 7 10 12 is the sendcounts array
841         3) 100*sizeof(int) is the size of the receiver buffer
842         4)  1 70 10 5 is the recvcounts array
843   */
844   double clock = smpi_process()->simulated_elapsed();
845
846   int comm_size = MPI_COMM_WORLD->size();
847   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848   int sendcounts[comm_size];
849   int recvcounts[comm_size];
850   int senddisps[comm_size];
851   int recvdisps[comm_size];
852
853   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
854
855   int send_buf_size=parse_double(action[2]);
856   int recv_buf_size=parse_double(action[3+comm_size]);
857   if(action[4+2*comm_size] && action[5+2*comm_size]) {
858     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
860   }
861   else
862     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
863
864   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
865   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
866
867   for(int i=0;i<comm_size;i++) {
868     sendcounts[i] = atoi(action[i+3]);
869     recvcounts[i] = atoi(action[i+4+comm_size]);
870     senddisps[i] = 0;
871     recvdisps[i] = 0;
872   }
873
874   int rank = smpi_process()->index();
875   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876   extra->type = TRACING_ALLTOALLV;
877   extra->recvcounts      = new int[comm_size];
878   extra->sendcounts      = new int[comm_size];
879   extra->num_processes = comm_size;
880
881   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882     extra->send_size += sendcounts[i];
883     extra->sendcounts[i] = sendcounts[i];
884     extra->recv_size += recvcounts[i];
885     extra->recvcounts[i] = recvcounts[i];
886   }
887   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
888   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
889
890   TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
891
892   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
894
895   TRACE_smpi_comm_out(rank);
896   log_timed_action (action, clock);
897 }
898
899 }} // namespace simgrid::smpi
900
901 /** @brief Only initialize the replay, don't do it for real */
902 void smpi_replay_init(int* argc, char*** argv)
903 {
904   simgrid::smpi::Process::init(argc, argv);
905   smpi_process()->mark_as_initialized();
906   smpi_process()->set_replaying(true);
907
908   int rank = smpi_process()->index();
909   TRACE_smpi_init(rank);
910   TRACE_smpi_computing_init(rank);
911   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
912   extra->type = TRACING_INIT;
913   TRACE_smpi_comm_in(rank, "smpi_replay_run_init", extra);
914   TRACE_smpi_comm_out(rank);
915   xbt_replay_action_register("init",       simgrid::smpi::action_init);
916   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
917   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
918   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
919   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
920   xbt_replay_action_register("send",       simgrid::smpi::action_send);
921   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
922   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
923   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
924   xbt_replay_action_register("test",       simgrid::smpi::action_test);
925   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
926   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
927   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
928   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
929   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
930   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
931   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
932   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
933   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
934   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
935   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
936   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
937   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
938   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
939
940   //if we have a delayed start, sleep here.
941   if(*argc>2){
942     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
943     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
944     smpi_execute_flops(value);
945   } else {
946     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
947     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
948     smpi_execute_flops(0.0);
949   }
950 }
951
952 /** @brief actually run the replay after initialization */
953 void smpi_replay_main(int* argc, char*** argv)
954 {
955   simgrid::xbt::replay_runner(*argc, *argv);
956
957   /* and now, finalize everything */
958   /* One active process will stop. Decrease the counter*/
959   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960   if (not get_reqq_self()->empty()) {
961     unsigned int count_requests=get_reqq_self()->size();
962     MPI_Request requests[count_requests];
963     MPI_Status status[count_requests];
964     unsigned int i=0;
965
966     for (auto const& req : *get_reqq_self()) {
967       requests[i] = req;
968       i++;
969     }
970     simgrid::smpi::Request::waitall(count_requests, requests, status);
971   }
972   delete get_reqq_self();
973   active_processes--;
974
975   if(active_processes==0){
976     /* Last process alive speaking: end the simulated timer */
977     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
978     xbt_free(sendbuffer);
979     xbt_free(recvbuffer);
980   }
981
982   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
983   extra_fin->type = TRACING_FINALIZE;
984   TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", extra_fin);
985
986   smpi_process()->finalize();
987
988   TRACE_smpi_comm_out(smpi_process()->index());
989   TRACE_smpi_finalize(smpi_process()->index());
990 }
991
992 /** @brief chain a replay initialization and a replay start */
993 void smpi_replay_run(int* argc, char*** argv)
994 {
995   smpi_replay_init(argc, argv);
996   smpi_replay_main(argc, argv);
997 }