Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix tests for shared and partial-shared.
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
9 #include <vector>
10
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
26
27 static void log_timed_action (const char *const *action, double clock){
28   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29     char *name = xbt_str_join_array(action, " ");
30     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
31     xbt_free(name);
32   }
33 }
34
35 static std::vector<MPI_Request>* get_reqq_self()
36 {
37   return reqq.at(smpi_process()->index());
38 }
39
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
41 {
42    reqq.insert({smpi_process()->index(), mpi_request});
43 }
44
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
47 {
48   if (!smpi_process()->replaying())
49     return xbt_malloc(size);
50   if (sendbuffer_size<size){
51     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
52     sendbuffer_size=size;
53   }
54   return sendbuffer;
55 }
56
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59   if (!smpi_process()->replaying())
60     return xbt_malloc(size);
61   if (recvbuffer_size<size){
62     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
63     recvbuffer_size=size;
64   }
65   return recvbuffer;
66 }
67
68 void smpi_free_tmp_buffer(void* buf){
69   if (!smpi_process()->replaying())
70     xbt_free(buf);
71 }
72
73 /* Helper function */
74 static double parse_double(const char *string)
75 {
76   char *endptr;
77   double value = strtod(string, &endptr);
78   if (*endptr != '\0')
79     THROWF(unknown_error, 0, "%s is not a double", string);
80   return value;
81 }
82
83 static MPI_Datatype decode_datatype(const char *const action)
84 {
85   switch(atoi(action)) {
86     case 0:
87       MPI_CURRENT_TYPE=MPI_DOUBLE;
88       break;
89     case 1:
90       MPI_CURRENT_TYPE=MPI_INT;
91       break;
92     case 2:
93       MPI_CURRENT_TYPE=MPI_CHAR;
94       break;
95     case 3:
96       MPI_CURRENT_TYPE=MPI_SHORT;
97       break;
98     case 4:
99       MPI_CURRENT_TYPE=MPI_LONG;
100       break;
101     case 5:
102       MPI_CURRENT_TYPE=MPI_FLOAT;
103       break;
104     case 6:
105       MPI_CURRENT_TYPE=MPI_BYTE;
106       break;
107     default:
108       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
109   }
110    return MPI_CURRENT_TYPE;
111 }
112
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
114 {
115   //default type for output is set to MPI_BYTE
116   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
117   if(known!=nullptr)
118     *known=1;
119   if (datatype==MPI_BYTE)
120       return "";
121   if(datatype==MPI_DOUBLE)
122       return "0";
123   if(datatype==MPI_INT)
124       return "1";
125   if(datatype==MPI_CHAR)
126       return "2";
127   if(datatype==MPI_SHORT)
128       return "3";
129   if(datatype==MPI_LONG)
130     return "4";
131   if(datatype==MPI_FLOAT)
132       return "5";
133   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
134   if(known!=nullptr)
135     *known=0;
136   // default - not implemented.
137   // do not warn here as we pass in this function even for other trace formats
138   return "-1";
139 }
140
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
142     int i=0;\
143     while(action[i]!=nullptr)\
144      i++;\
145     if(i<mandatory+2)                                           \
146     THROWF(arg_error, 0, "%s replay failed.\n" \
147           "%d items were given on the line. First two should be process_id and action.  " \
148           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
150   }
151
152 namespace simgrid {
153 namespace smpi {
154
155 static void action_init(const char *const *action)
156 {
157   XBT_DEBUG("Initialize the counters");
158   CHECK_ACTION_PARAMS(action, 0, 1)
159   if(action[2]) 
160     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
161   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
162
163   /* start a simulated timer */
164   smpi_process()->simulated_start();
165   /*initialize the number of active processes */
166   active_processes = smpi_process_count();
167
168   set_reqq_self(new std::vector<MPI_Request>);
169 }
170
171 static void action_finalize(const char *const *action)
172 {
173   /* Nothing to do */
174 }
175
176 static void action_comm_size(const char *const *action)
177 {
178   communicator_size = parse_double(action[2]);
179   log_timed_action (action, smpi_process()->simulated_elapsed());
180 }
181
182 static void action_comm_split(const char *const *action)
183 {
184   log_timed_action (action, smpi_process()->simulated_elapsed());
185 }
186
187 static void action_comm_dup(const char *const *action)
188 {
189   log_timed_action (action, smpi_process()->simulated_elapsed());
190 }
191
192 static void action_compute(const char *const *action)
193 {
194   CHECK_ACTION_PARAMS(action, 1, 0)
195   double clock = smpi_process()->simulated_elapsed();
196   double flops= parse_double(action[2]);
197   int rank = smpi_process()->index();
198   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
199   extra->type=TRACING_COMPUTING;
200   extra->comp_size=flops;
201   TRACE_smpi_computing_in(rank, extra);
202
203   smpi_execute_flops(flops);
204
205   TRACE_smpi_computing_out(rank);
206   log_timed_action (action, clock);
207 }
208
209 static void action_send(const char *const *action)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   int to = atoi(action[2]);
213   double size=parse_double(action[3]);
214   double clock = smpi_process()->simulated_elapsed();
215
216   if(action[4])
217     MPI_CURRENT_TYPE=decode_datatype(action[4]);
218   else
219     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
220
221   int rank = smpi_process()->index();
222
223   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
224   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
225   extra->type = TRACING_SEND;
226   extra->send_size = size;
227   extra->src = rank;
228   extra->dst = dst_traced;
229   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
230   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
231   if (!TRACE_smpi_view_internals())
232     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
233
234   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
235
236   log_timed_action (action, clock);
237
238   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
239 }
240
241 static void action_Isend(const char *const *action)
242 {
243   CHECK_ACTION_PARAMS(action, 2, 1)
244   int to = atoi(action[2]);
245   double size=parse_double(action[3]);
246   double clock = smpi_process()->simulated_elapsed();
247
248   if(action[4]) 
249     MPI_CURRENT_TYPE=decode_datatype(action[4]);
250   else 
251     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
252
253   int rank = smpi_process()->index();
254   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
255   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
256   extra->type = TRACING_ISEND;
257   extra->send_size = size;
258   extra->src = rank;
259   extra->dst = dst_traced;
260   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
261   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
262   if (!TRACE_smpi_view_internals())
263     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
264
265   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
266
267   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
268
269   get_reqq_self()->push_back(request);
270
271   log_timed_action (action, clock);
272 }
273
274 static void action_recv(const char *const *action) {
275   CHECK_ACTION_PARAMS(action, 2, 1)
276   int from = atoi(action[2]);
277   double size=parse_double(action[3]);
278   double clock = smpi_process()->simulated_elapsed();
279   MPI_Status status;
280
281   if(action[4]) 
282     MPI_CURRENT_TYPE=decode_datatype(action[4]);
283   else 
284     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285
286   int rank = smpi_process()->index();
287   int src_traced = MPI_COMM_WORLD->group()->rank(from);
288
289   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290   extra->type = TRACING_RECV;
291   extra->send_size = size;
292   extra->src = src_traced;
293   extra->dst = rank;
294   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
296
297   //unknown size from the receiver point of view
298   if(size<=0.0){
299     Request::probe(from, 0, MPI_COMM_WORLD, &status);
300     size=status.count;
301   }
302
303   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
304
305   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306   if (!TRACE_smpi_view_internals()) {
307     TRACE_smpi_recv(rank, src_traced, rank, 0);
308   }
309
310   log_timed_action (action, clock);
311 }
312
313 static void action_Irecv(const char *const *action)
314 {
315   CHECK_ACTION_PARAMS(action, 2, 1)
316   int from = atoi(action[2]);
317   double size=parse_double(action[3]);
318   double clock = smpi_process()->simulated_elapsed();
319
320   if(action[4]) 
321     MPI_CURRENT_TYPE=decode_datatype(action[4]);
322   else 
323     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
324
325   int rank = smpi_process()->index();
326   int src_traced = MPI_COMM_WORLD->group()->rank(from);
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type = TRACING_IRECV;
329   extra->send_size = size;
330   extra->src = src_traced;
331   extra->dst = rank;
332   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
334   MPI_Status status;
335   //unknow size from the receiver pov
336   if(size<=0.0){
337       Request::probe(from, 0, MPI_COMM_WORLD, &status);
338       size=status.count;
339   }
340
341   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
342
343   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
344   get_reqq_self()->push_back(request);
345
346   log_timed_action (action, clock);
347 }
348
349 static void action_test(const char *const *action){
350   CHECK_ACTION_PARAMS(action, 0, 0)
351   double clock = smpi_process()->simulated_elapsed();
352   MPI_Status status;
353
354   MPI_Request request = get_reqq_self()->back();
355   get_reqq_self()->pop_back();
356   //if request is null here, this may mean that a previous test has succeeded 
357   //Different times in traced application and replayed version may lead to this 
358   //In this case, ignore the extra calls.
359   if(request!=nullptr){
360     int rank = smpi_process()->index();
361     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362     extra->type=TRACING_TEST;
363     TRACE_smpi_testing_in(rank, extra);
364
365     int flag = Request::test(&request, &status);
366
367     XBT_DEBUG("MPI_Test result: %d", flag);
368     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
369     get_reqq_self()->push_back(request);
370
371     TRACE_smpi_testing_out(rank);
372   }
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   CHECK_ACTION_PARAMS(action, 0, 0)
378   double clock = smpi_process()->simulated_elapsed();
379   MPI_Status status;
380
381   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
382       xbt_str_join_array(action," "));
383   MPI_Request request = get_reqq_self()->back();
384   get_reqq_self()->pop_back();
385
386   if (request==nullptr){
387     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
388     return;
389   }
390
391   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
392
393   MPI_Group group = request->comm()->group();
394   int src_traced = group->rank(request->src());
395   int dst_traced = group->rank(request->dst());
396   int is_wait_for_receive = (request->flags() & RECV);
397   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398   extra->type = TRACING_WAIT;
399   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
400
401   Request::wait(&request, &status);
402
403   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
404   if (is_wait_for_receive)
405     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
406   log_timed_action (action, clock);
407 }
408
409 static void action_waitall(const char *const *action){
410   CHECK_ACTION_PARAMS(action, 0, 0)
411   double clock = smpi_process()->simulated_elapsed();
412   unsigned int count_requests=get_reqq_self()->size();
413
414   if (count_requests>0) {
415     MPI_Status status[count_requests];
416
417    int rank_traced = smpi_process()->index();
418    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
419    extra->type = TRACING_WAITALL;
420    extra->send_size=count_requests;
421    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
422    int recvs_snd[count_requests];
423    int recvs_rcv[count_requests];
424    unsigned int i=0;
425    for (auto req : *(get_reqq_self())){
426      if (req && (req->flags () & RECV)){
427        recvs_snd[i]=req->src();
428        recvs_rcv[i]=req->dst();
429      }else
430        recvs_snd[i]=-100;
431      i++;
432    }
433    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
434
435    for (i=0; i<count_requests;i++){
436      if (recvs_snd[i]!=-100)
437        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
438    }
439    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
440   }
441   log_timed_action (action, clock);
442 }
443
444 static void action_barrier(const char *const *action){
445   double clock = smpi_process()->simulated_elapsed();
446   int rank = smpi_process()->index();
447   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
448   extra->type = TRACING_BARRIER;
449   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
450
451   Colls::barrier(MPI_COMM_WORLD);
452
453   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
454   log_timed_action (action, clock);
455 }
456
457 static void action_bcast(const char *const *action)
458 {
459   CHECK_ACTION_PARAMS(action, 1, 2)
460   double size = parse_double(action[2]);
461   double clock = smpi_process()->simulated_elapsed();
462   int root=0;
463   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
464   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
465
466   if(action[3]) {
467     root= atoi(action[3]);
468     if(action[4])
469       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
470   }
471
472   int rank = smpi_process()->index();
473   int root_traced = MPI_COMM_WORLD->group()->index(root);
474
475   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
476   extra->type = TRACING_BCAST;
477   extra->send_size = size;
478   extra->root = root_traced;
479   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
480   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
481   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
482
483   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484
485   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486   log_timed_action (action, clock);
487 }
488
489 static void action_reduce(const char *const *action)
490 {
491   CHECK_ACTION_PARAMS(action, 2, 2)
492   double comm_size = parse_double(action[2]);
493   double comp_size = parse_double(action[3]);
494   double clock = smpi_process()->simulated_elapsed();
495   int root=0;
496   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
497
498   if(action[4]) {
499     root= atoi(action[4]);
500     if(action[5])
501       MPI_CURRENT_TYPE=decode_datatype(action[5]);
502   }
503
504   int rank = smpi_process()->index();
505   int root_traced = MPI_COMM_WORLD->group()->rank(root);
506   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
507   extra->type = TRACING_REDUCE;
508   extra->send_size = comm_size;
509   extra->comp_size = comp_size;
510   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
511   extra->root = root_traced;
512
513   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
514
515   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
516   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
518   smpi_execute_flops(comp_size);
519
520   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
521   log_timed_action (action, clock);
522 }
523
524 static void action_allReduce(const char *const *action) {
525   CHECK_ACTION_PARAMS(action, 2, 1)
526   double comm_size = parse_double(action[2]);
527   double comp_size = parse_double(action[3]);
528
529   if(action[4])
530     MPI_CURRENT_TYPE=decode_datatype(action[4]);
531   else
532     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533
534   double clock = smpi_process()->simulated_elapsed();
535   int rank = smpi_process()->index();
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_ALLREDUCE;
538   extra->send_size = comm_size;
539   extra->comp_size = comp_size;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
541   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
542
543   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
544   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
546   smpi_execute_flops(comp_size);
547
548   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
549   log_timed_action (action, clock);
550 }
551
552 static void action_allToAll(const char *const *action) {
553   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
554   double clock = smpi_process()->simulated_elapsed();
555   int comm_size = MPI_COMM_WORLD->size();
556   int send_size = parse_double(action[2]);
557   int recv_size = parse_double(action[3]);
558   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
559
560   if(action[4] && action[5]) {
561     MPI_CURRENT_TYPE=decode_datatype(action[4]);
562     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
563   }
564   else
565     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
566
567   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
568   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
569
570   int rank = smpi_process()->index();
571   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
572   extra->type = TRACING_ALLTOALL;
573   extra->send_size = send_size;
574   extra->recv_size = recv_size;
575   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
576   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
577
578   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
579
580   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
581
582   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
583   log_timed_action (action, clock);
584 }
585
586 static void action_gather(const char *const *action) {
587   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
588         0 gather 68 68 0 0 0
589       where:
590         1) 68 is the sendcounts
591         2) 68 is the recvcounts
592         3) 0 is the root node
593         4) 0 is the send datatype id, see decode_datatype()
594         5) 0 is the recv datatype id, see decode_datatype()
595   */
596   CHECK_ACTION_PARAMS(action, 2, 3)
597   double clock = smpi_process()->simulated_elapsed();
598   int comm_size = MPI_COMM_WORLD->size();
599   int send_size = parse_double(action[2]);
600   int recv_size = parse_double(action[3]);
601   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
602   if(action[4] && action[5]) {
603     MPI_CURRENT_TYPE=decode_datatype(action[5]);
604     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
605   } else {
606     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
607   }
608   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609   void *recv = nullptr;
610   int root=0;
611   if(action[4])
612     root=atoi(action[4]);
613   int rank = MPI_COMM_WORLD->rank();
614
615   if(rank==root)
616     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
617
618   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
619   extra->type = TRACING_GATHER;
620   extra->send_size = send_size;
621   extra->recv_size = recv_size;
622   extra->root = root;
623   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
624   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
625
626   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
627
628   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
629
630   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
631   log_timed_action (action, clock);
632 }
633
634 static void action_gatherv(const char *const *action) {
635   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
636        0 gather 68 68 10 10 10 0 0 0
637      where:
638        1) 68 is the sendcount
639        2) 68 10 10 10 is the recvcounts
640        3) 0 is the root node
641        4) 0 is the send datatype id, see decode_datatype()
642        5) 0 is the recv datatype id, see decode_datatype()
643   */
644   double clock = smpi_process()->simulated_elapsed();
645   int comm_size = MPI_COMM_WORLD->size();
646   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
647   int send_size = parse_double(action[2]);
648   int disps[comm_size];
649   int recvcounts[comm_size];
650   int recv_sum=0;
651
652   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
653   if(action[4+comm_size] && action[5+comm_size]) {
654     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
655     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
656   } else
657     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
658
659   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
660   void *recv = nullptr;
661   for(int i=0;i<comm_size;i++) {
662     recvcounts[i] = atoi(action[i+3]);
663     recv_sum=recv_sum+recvcounts[i];
664     disps[i]=0;
665   }
666
667   int root=atoi(action[3+comm_size]);
668   int rank = MPI_COMM_WORLD->rank();
669
670   if(rank==root)
671     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
672
673   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
674   extra->type = TRACING_GATHERV;
675   extra->send_size = send_size;
676   extra->recvcounts= xbt_new(int,comm_size);
677   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
678     extra->recvcounts[i] = recvcounts[i];
679   extra->root = root;
680   extra->num_processes = comm_size;
681   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
682   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
683
684   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
685
686   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
687
688   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
689   log_timed_action (action, clock);
690 }
691
692 static void action_reducescatter(const char *const *action) {
693  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
694       0 reduceScatter 275427 275427 275427 204020 11346849 0
695     where:
696       1) The first four values after the name of the action declare the recvcounts array
697       2) The value 11346849 is the amount of instructions
698       3) The last value corresponds to the datatype, see decode_datatype().
699 */
700   double clock = smpi_process()->simulated_elapsed();
701   int comm_size = MPI_COMM_WORLD->size();
702   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
703   int comp_size = parse_double(action[2+comm_size]);
704   int recvcounts[comm_size];
705   int rank = smpi_process()->index();
706   int size = 0;
707   if(action[3+comm_size])
708     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
709   else
710     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
711
712   for(int i=0;i<comm_size;i++) {
713     recvcounts[i] = atoi(action[i+2]);
714     size+=recvcounts[i];
715   }
716
717   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
718   extra->type = TRACING_REDUCE_SCATTER;
719   extra->send_size = 0;
720   extra->recvcounts= xbt_new(int, comm_size);
721   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
722     extra->recvcounts[i] = recvcounts[i];
723   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
724   extra->comp_size = comp_size;
725   extra->num_processes = comm_size;
726
727   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
728
729   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
730   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
731
732   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
733   smpi_execute_flops(comp_size);
734
735   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
736   log_timed_action (action, clock);
737 }
738
739 static void action_allgather(const char *const *action) {
740   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
741         0 allGather 275427 275427
742     where:
743         1) 275427 is the sendcount
744         2) 275427 is the recvcount
745         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
746   */
747   double clock = smpi_process()->simulated_elapsed();
748
749   CHECK_ACTION_PARAMS(action, 2, 2)
750   int sendcount=atoi(action[2]); 
751   int recvcount=atoi(action[3]); 
752
753   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
754
755   if(action[4] && action[5]) {
756     MPI_CURRENT_TYPE = decode_datatype(action[4]);
757     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
758   } else
759     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
760
761   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
762   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
763
764   int rank = smpi_process()->index();
765   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
766   extra->type = TRACING_ALLGATHER;
767   extra->send_size = sendcount;
768   extra->recv_size= recvcount;
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
770   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
771   extra->num_processes = MPI_COMM_WORLD->size();
772
773   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
774
775   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
776
777   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
778   log_timed_action (action, clock);
779 }
780
781 static void action_allgatherv(const char *const *action) {
782   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
783         0 allGatherV 275427 275427 275427 275427 204020
784      where:
785         1) 275427 is the sendcount
786         2) The next four elements declare the recvcounts array
787         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
788   */
789   double clock = smpi_process()->simulated_elapsed();
790
791   int comm_size = MPI_COMM_WORLD->size();
792   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
793   int sendcount=atoi(action[2]);
794   int recvcounts[comm_size];
795   int disps[comm_size];
796   int recv_sum=0;
797   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
798
799   if(action[3+comm_size] && action[4+comm_size]) {
800     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
801     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
802   } else
803     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
804
805   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
806
807   for(int i=0;i<comm_size;i++) {
808     recvcounts[i] = atoi(action[i+3]);
809     recv_sum=recv_sum+recvcounts[i];
810     disps[i] = 0;
811   }
812   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
813
814   int rank = smpi_process()->index();
815   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816   extra->type = TRACING_ALLGATHERV;
817   extra->send_size = sendcount;
818   extra->recvcounts= xbt_new(int, comm_size);
819   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
820     extra->recvcounts[i] = recvcounts[i];
821   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
822   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
823   extra->num_processes = comm_size;
824
825   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
826
827   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
828                           MPI_COMM_WORLD);
829
830   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
831   log_timed_action (action, clock);
832 }
833
834 static void action_allToAllv(const char *const *action) {
835   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
836         0 allToAllV 100 1 7 10 12 100 1 70 10 5
837      where:
838         1) 100 is the size of the send buffer *sizeof(int),
839         2) 1 7 10 12 is the sendcounts array
840         3) 100*sizeof(int) is the size of the receiver buffer
841         4)  1 70 10 5 is the recvcounts array
842   */
843   double clock = smpi_process()->simulated_elapsed();
844
845   int comm_size = MPI_COMM_WORLD->size();
846   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
847   int sendcounts[comm_size];
848   int recvcounts[comm_size];
849   int senddisps[comm_size];
850   int recvdisps[comm_size];
851
852   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
853
854   int send_buf_size=parse_double(action[2]);
855   int recv_buf_size=parse_double(action[3+comm_size]);
856   if(action[4+2*comm_size] && action[5+2*comm_size]) {
857     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
858     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
859   }
860   else
861     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
862
863   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
864   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
865
866   for(int i=0;i<comm_size;i++) {
867     sendcounts[i] = atoi(action[i+3]);
868     recvcounts[i] = atoi(action[i+4+comm_size]);
869     senddisps[i] = 0;
870     recvdisps[i] = 0;
871   }
872
873   int rank = smpi_process()->index();
874   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
875   extra->type = TRACING_ALLTOALLV;
876   extra->recvcounts= xbt_new(int, comm_size);
877   extra->sendcounts= xbt_new(int, comm_size);
878   extra->num_processes = comm_size;
879
880   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
881     extra->send_size += sendcounts[i];
882     extra->sendcounts[i] = sendcounts[i];
883     extra->recv_size += recvcounts[i];
884     extra->recvcounts[i] = recvcounts[i];
885   }
886   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
887   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
888
889   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
890
891   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
892                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
893
894   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
895   log_timed_action (action, clock);
896 }
897
898 }} // namespace simgrid::smpi
899
900 void smpi_replay_run(int *argc, char***argv){
901   /* First initializes everything */
902   simgrid::smpi::Process::init(argc, argv);
903   smpi_process()->mark_as_initialized();
904   smpi_process()->set_replaying(true);
905
906   int rank = smpi_process()->index();
907   TRACE_smpi_init(rank);
908   TRACE_smpi_computing_init(rank);
909   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
910   extra->type = TRACING_INIT;
911   char *operation =bprintf("%s_init",__FUNCTION__);
912   TRACE_smpi_collective_in(rank, -1, operation, extra);
913   TRACE_smpi_collective_out(rank, -1, operation);
914   xbt_free(operation);
915   xbt_replay_action_register("init",       simgrid::smpi::action_init);
916   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
917   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
918   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
919   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
920   xbt_replay_action_register("send",       simgrid::smpi::action_send);
921   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
922   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
923   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
924   xbt_replay_action_register("test",       simgrid::smpi::action_test);
925   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
926   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
927   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
928   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
929   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
930   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
931   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
932   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
933   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
934   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
935   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
936   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
937   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
938   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
939
940   //if we have a delayed start, sleep here.
941   if(*argc>2){
942     char *endptr;
943     double value = strtod((*argv)[2], &endptr);
944     if (*endptr != '\0')
945       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
946     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
947     smpi_execute_flops(value);
948   } else {
949     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
950     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
951     smpi_execute_flops(0.0);
952   }
953
954   /* Actually run the replay */
955   simgrid::xbt::replay_runner(*argc, *argv);
956
957   /* and now, finalize everything */
958   /* One active process will stop. Decrease the counter*/
959   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960   if (!get_reqq_self()->empty()){
961     unsigned int count_requests=get_reqq_self()->size();
962     MPI_Request requests[count_requests];
963     MPI_Status status[count_requests];
964     unsigned int i=0;
965
966     for (auto req: *get_reqq_self()){
967       requests[i] = req;
968       i++;
969     }
970     simgrid::smpi::Request::waitall(count_requests, requests, status);
971   }
972   delete get_reqq_self();
973   active_processes--;
974
975   if(active_processes==0){
976     /* Last process alive speaking: end the simulated timer */
977     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
978     xbt_free(sendbuffer);
979     xbt_free(recvbuffer);
980   }
981
982   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
983   extra_fin->type = TRACING_FINALIZE;
984   operation =bprintf("%s_finalize",__FUNCTION__);
985   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
986
987   smpi_process()->finalize();
988
989   TRACE_smpi_collective_out(rank, -1, operation);
990   TRACE_smpi_finalize(smpi_process()->index());
991   smpi_process()->destroy();
992   xbt_free(operation);
993 }