Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
one step further on C++ization of replay
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
9 #include <vector>
10
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
26
27 static void log_timed_action (const char *const *action, double clock){
28   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29     char *name = xbt_str_join_array(action, " ");
30     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
31     xbt_free(name);
32   }
33 }
34
35 static std::vector<MPI_Request>* get_reqq_self()
36 {
37   return reqq.at(smpi_process_index());
38 }
39
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
41 {
42    reqq.insert({smpi_process_index(), mpi_request});
43 }
44
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
47 {
48   if (!smpi_process_get_replaying())
49     return xbt_malloc(size);
50   if (sendbuffer_size<size){
51     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
52     sendbuffer_size=size;
53   }
54   return sendbuffer;
55 }
56
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59   if (!smpi_process_get_replaying())
60     return xbt_malloc(size);
61   if (recvbuffer_size<size){
62     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
63     recvbuffer_size=size;
64   }
65   return recvbuffer;
66 }
67
68 void smpi_free_tmp_buffer(void* buf){
69   if (!smpi_process_get_replaying())
70     xbt_free(buf);
71 }
72
73 /* Helper function */
74 static double parse_double(const char *string)
75 {
76   char *endptr;
77   double value = strtod(string, &endptr);
78   if (*endptr != '\0')
79     THROWF(unknown_error, 0, "%s is not a double", string);
80   return value;
81 }
82
83 static MPI_Datatype decode_datatype(const char *const action)
84 {
85   switch(atoi(action)) {
86     case 0:
87       MPI_CURRENT_TYPE=MPI_DOUBLE;
88       break;
89     case 1:
90       MPI_CURRENT_TYPE=MPI_INT;
91       break;
92     case 2:
93       MPI_CURRENT_TYPE=MPI_CHAR;
94       break;
95     case 3:
96       MPI_CURRENT_TYPE=MPI_SHORT;
97       break;
98     case 4:
99       MPI_CURRENT_TYPE=MPI_LONG;
100       break;
101     case 5:
102       MPI_CURRENT_TYPE=MPI_FLOAT;
103       break;
104     case 6:
105       MPI_CURRENT_TYPE=MPI_BYTE;
106       break;
107     default:
108       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
109   }
110    return MPI_CURRENT_TYPE;
111 }
112
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
114 {
115   //default type for output is set to MPI_BYTE
116   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
117   if(known!=nullptr)
118     *known=1;
119   if (datatype==MPI_BYTE)
120       return "";
121   if(datatype==MPI_DOUBLE)
122       return "0";
123   if(datatype==MPI_INT)
124       return "1";
125   if(datatype==MPI_CHAR)
126       return "2";
127   if(datatype==MPI_SHORT)
128       return "3";
129   if(datatype==MPI_LONG)
130     return "4";
131   if(datatype==MPI_FLOAT)
132       return "5";
133   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
134   if(known!=nullptr)
135     *known=0;
136   // default - not implemented.
137   // do not warn here as we pass in this function even for other trace formats
138   return "-1";
139 }
140
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
142     int i=0;\
143     while(action[i]!=nullptr)\
144      i++;\
145     if(i<mandatory+2)                                           \
146     THROWF(arg_error, 0, "%s replay failed.\n" \
147           "%d items were given on the line. First two should be process_id and action.  " \
148           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
150   }
151
152 static void action_init(const char *const *action)
153 {
154   XBT_DEBUG("Initialize the counters");
155   CHECK_ACTION_PARAMS(action, 0, 1)
156   if(action[2]) 
157     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
158   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
159
160   /* start a simulated timer */
161   smpi_process_simulated_start();
162   /*initialize the number of active processes */
163   active_processes = smpi_process_count();
164
165   set_reqq_self(new std::vector<MPI_Request>);
166 }
167
168 static void action_finalize(const char *const *action)
169 {
170   /* Nothing to do */
171 }
172
173 static void action_comm_size(const char *const *action)
174 {
175   communicator_size = parse_double(action[2]);
176   log_timed_action (action, smpi_process_simulated_elapsed());
177 }
178
179 static void action_comm_split(const char *const *action)
180 {
181   log_timed_action (action, smpi_process_simulated_elapsed());
182 }
183
184 static void action_comm_dup(const char *const *action)
185 {
186   log_timed_action (action, smpi_process_simulated_elapsed());
187 }
188
189 static void action_compute(const char *const *action)
190 {
191   CHECK_ACTION_PARAMS(action, 1, 0)
192   double clock = smpi_process_simulated_elapsed();
193   double flops= parse_double(action[2]);
194   int rank = smpi_process_index();
195   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
196   extra->type=TRACING_COMPUTING;
197   extra->comp_size=flops;
198   TRACE_smpi_computing_in(rank, extra);
199
200   smpi_execute_flops(flops);
201
202   TRACE_smpi_computing_out(rank);
203   log_timed_action (action, clock);
204 }
205
206 static void action_send(const char *const *action)
207 {
208   CHECK_ACTION_PARAMS(action, 2, 1)
209   int to = atoi(action[2]);
210   double size=parse_double(action[3]);
211   double clock = smpi_process_simulated_elapsed();
212
213   if(action[4])
214     MPI_CURRENT_TYPE=decode_datatype(action[4]);
215   else
216     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
217
218   int rank = smpi_process_index();
219
220   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
221   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
222   extra->type = TRACING_SEND;
223   extra->send_size = size;
224   extra->src = rank;
225   extra->dst = dst_traced;
226   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
227   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
228   if (!TRACE_smpi_view_internals())
229     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
230
231   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
232
233   log_timed_action (action, clock);
234
235   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
236 }
237
238 static void action_Isend(const char *const *action)
239 {
240   CHECK_ACTION_PARAMS(action, 2, 1)
241   int to = atoi(action[2]);
242   double size=parse_double(action[3]);
243   double clock = smpi_process_simulated_elapsed();
244
245   if(action[4]) 
246     MPI_CURRENT_TYPE=decode_datatype(action[4]);
247   else 
248     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249
250   int rank = smpi_process_index();
251   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_ISEND;
254   extra->send_size = size;
255   extra->src = rank;
256   extra->dst = dst_traced;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
258   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
259   if (!TRACE_smpi_view_internals())
260     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
261
262   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
263
264   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265
266   get_reqq_self()->push_back(request);
267
268   log_timed_action (action, clock);
269 }
270
271 static void action_recv(const char *const *action) {
272   CHECK_ACTION_PARAMS(action, 2, 1)
273   int from = atoi(action[2]);
274   double size=parse_double(action[3]);
275   double clock = smpi_process_simulated_elapsed();
276   MPI_Status status;
277
278   if(action[4]) 
279     MPI_CURRENT_TYPE=decode_datatype(action[4]);
280   else 
281     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
282
283   int rank = smpi_process_index();
284   int src_traced = MPI_COMM_WORLD->group()->rank(from);
285
286   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
287   extra->type = TRACING_RECV;
288   extra->send_size = size;
289   extra->src = src_traced;
290   extra->dst = rank;
291   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
292   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
293
294   //unknown size from the receiver point of view
295   if(size<=0.0){
296     Request::probe(from, 0, MPI_COMM_WORLD, &status);
297     size=status.count;
298   }
299
300   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
301
302   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
303   if (!TRACE_smpi_view_internals()) {
304     TRACE_smpi_recv(rank, src_traced, rank, 0);
305   }
306
307   log_timed_action (action, clock);
308 }
309
310 static void action_Irecv(const char *const *action)
311 {
312   CHECK_ACTION_PARAMS(action, 2, 1)
313   int from = atoi(action[2]);
314   double size=parse_double(action[3]);
315   double clock = smpi_process_simulated_elapsed();
316
317   if(action[4]) 
318     MPI_CURRENT_TYPE=decode_datatype(action[4]);
319   else 
320     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
321
322   int rank = smpi_process_index();
323   int src_traced = MPI_COMM_WORLD->group()->rank(from);
324   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325   extra->type = TRACING_IRECV;
326   extra->send_size = size;
327   extra->src = src_traced;
328   extra->dst = rank;
329   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
330   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
331   MPI_Status status;
332   //unknow size from the receiver pov
333   if(size<=0.0){
334       Request::probe(from, 0, MPI_COMM_WORLD, &status);
335       size=status.count;
336   }
337
338   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
339
340   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341   get_reqq_self()->push_back(request);
342
343   log_timed_action (action, clock);
344 }
345
346 static void action_test(const char *const *action){
347   CHECK_ACTION_PARAMS(action, 0, 0)
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Status status;
350
351   MPI_Request request = get_reqq_self()->back();
352   get_reqq_self()->pop_back();
353   //if request is null here, this may mean that a previous test has succeeded 
354   //Different times in traced application and replayed version may lead to this 
355   //In this case, ignore the extra calls.
356   if(request!=nullptr){
357     int rank = smpi_process_index();
358     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359     extra->type=TRACING_TEST;
360     TRACE_smpi_testing_in(rank, extra);
361
362     int flag = Request::test(&request, &status);
363
364     XBT_DEBUG("MPI_Test result: %d", flag);
365     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
366     get_reqq_self()->push_back(request);
367
368     TRACE_smpi_testing_out(rank);
369   }
370   log_timed_action (action, clock);
371 }
372
373 static void action_wait(const char *const *action){
374   CHECK_ACTION_PARAMS(action, 0, 0)
375   double clock = smpi_process_simulated_elapsed();
376   MPI_Status status;
377
378   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
379       xbt_str_join_array(action," "));
380   MPI_Request request = get_reqq_self()->back();
381   get_reqq_self()->pop_back();
382
383   if (request==nullptr){
384     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
385     return;
386   }
387
388   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
389
390   MPI_Group group = request->comm()->group();
391   int src_traced = group->rank(request->src());
392   int dst_traced = group->rank(request->dst());
393   int is_wait_for_receive = (request->flags() & RECV);
394   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
395   extra->type = TRACING_WAIT;
396   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
397
398   Request::wait(&request, &status);
399
400   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
401   if (is_wait_for_receive)
402     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
403   log_timed_action (action, clock);
404 }
405
406 static void action_waitall(const char *const *action){
407   CHECK_ACTION_PARAMS(action, 0, 0)
408   double clock = smpi_process_simulated_elapsed();
409   unsigned int count_requests=get_reqq_self()->size();
410
411   if (count_requests>0) {
412     MPI_Status status[count_requests];
413
414    int rank_traced = smpi_process_index();
415    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
416    extra->type = TRACING_WAITALL;
417    extra->send_size=count_requests;
418    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
419    int recvs_snd[count_requests];
420    int recvs_rcv[count_requests];
421    unsigned int i=0;
422    for (auto req : *(get_reqq_self())){
423      if (req && (req->flags () & RECV)){
424        recvs_snd[i]=req->src();
425        recvs_rcv[i]=req->dst();
426      }else
427        recvs_snd[i]=-100;
428      i++;
429    }
430    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
431
432    for (i=0; i<count_requests;i++){
433      if (recvs_snd[i]!=-100)
434        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
435    }
436    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
437   }
438   log_timed_action (action, clock);
439 }
440
441 static void action_barrier(const char *const *action){
442   double clock = smpi_process_simulated_elapsed();
443   int rank = smpi_process_index();
444   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
445   extra->type = TRACING_BARRIER;
446   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
447
448   mpi_coll_barrier_fun(MPI_COMM_WORLD);
449
450   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
451   log_timed_action (action, clock);
452 }
453
454 static void action_bcast(const char *const *action)
455 {
456   CHECK_ACTION_PARAMS(action, 1, 2)
457   double size = parse_double(action[2]);
458   double clock = smpi_process_simulated_elapsed();
459   int root=0;
460   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
461   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
462
463   if(action[3]) {
464     root= atoi(action[3]);
465     if(action[4])
466       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
467   }
468
469   int rank = smpi_process_index();
470   int root_traced = MPI_COMM_WORLD->group()->index(root);
471
472   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
473   extra->type = TRACING_BCAST;
474   extra->send_size = size;
475   extra->root = root_traced;
476   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
477   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
478   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
479
480   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
481
482   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
483   log_timed_action (action, clock);
484 }
485
486 static void action_reduce(const char *const *action)
487 {
488   CHECK_ACTION_PARAMS(action, 2, 2)
489   double comm_size = parse_double(action[2]);
490   double comp_size = parse_double(action[3]);
491   double clock = smpi_process_simulated_elapsed();
492   int root=0;
493   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
494
495   if(action[4]) {
496     root= atoi(action[4]);
497     if(action[5])
498       MPI_CURRENT_TYPE=decode_datatype(action[5]);
499   }
500
501   int rank = smpi_process_index();
502   int root_traced = MPI_COMM_WORLD->group()->rank(root);
503   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
504   extra->type = TRACING_REDUCE;
505   extra->send_size = comm_size;
506   extra->comp_size = comp_size;
507   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
508   extra->root = root_traced;
509
510   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
511
512   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
513   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
514   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
515   smpi_execute_flops(comp_size);
516
517   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
518   log_timed_action (action, clock);
519 }
520
521 static void action_allReduce(const char *const *action) {
522   CHECK_ACTION_PARAMS(action, 2, 1)
523   double comm_size = parse_double(action[2]);
524   double comp_size = parse_double(action[3]);
525
526   if(action[4])
527     MPI_CURRENT_TYPE=decode_datatype(action[4]);
528   else
529     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
530
531   double clock = smpi_process_simulated_elapsed();
532   int rank = smpi_process_index();
533   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
534   extra->type = TRACING_ALLREDUCE;
535   extra->send_size = comm_size;
536   extra->comp_size = comp_size;
537   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
538   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
539
540   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
541   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
542   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
543   smpi_execute_flops(comp_size);
544
545   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
546   log_timed_action (action, clock);
547 }
548
549 static void action_allToAll(const char *const *action) {
550   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
551   double clock = smpi_process_simulated_elapsed();
552   int comm_size = MPI_COMM_WORLD->size();
553   int send_size = parse_double(action[2]);
554   int recv_size = parse_double(action[3]);
555   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
556
557   if(action[4] && action[5]) {
558     MPI_CURRENT_TYPE=decode_datatype(action[4]);
559     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
560   }
561   else
562     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
563
564   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
565   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
566
567   int rank = smpi_process_index();
568   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569   extra->type = TRACING_ALLTOALL;
570   extra->send_size = send_size;
571   extra->recv_size = recv_size;
572   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
573   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
574
575   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
576
577   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
578
579   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
580   log_timed_action (action, clock);
581 }
582
583 static void action_gather(const char *const *action) {
584   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
585         0 gather 68 68 0 0 0
586       where:
587         1) 68 is the sendcounts
588         2) 68 is the recvcounts
589         3) 0 is the root node
590         4) 0 is the send datatype id, see decode_datatype()
591         5) 0 is the recv datatype id, see decode_datatype()
592   */
593   CHECK_ACTION_PARAMS(action, 2, 3)
594   double clock = smpi_process_simulated_elapsed();
595   int comm_size = MPI_COMM_WORLD->size();
596   int send_size = parse_double(action[2]);
597   int recv_size = parse_double(action[3]);
598   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
599   if(action[4] && action[5]) {
600     MPI_CURRENT_TYPE=decode_datatype(action[5]);
601     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
602   } else {
603     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
604   }
605   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606   void *recv = nullptr;
607   int root=0;
608   if(action[4])
609     root=atoi(action[4]);
610   int rank = MPI_COMM_WORLD->rank();
611
612   if(rank==root)
613     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
614
615   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
616   extra->type = TRACING_GATHER;
617   extra->send_size = send_size;
618   extra->recv_size = recv_size;
619   extra->root = root;
620   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
621   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
622
623   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
624
625   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
626
627   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
628   log_timed_action (action, clock);
629 }
630
631 static void action_gatherv(const char *const *action) {
632   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
633        0 gather 68 68 10 10 10 0 0 0
634      where:
635        1) 68 is the sendcount
636        2) 68 10 10 10 is the recvcounts
637        3) 0 is the root node
638        4) 0 is the send datatype id, see decode_datatype()
639        5) 0 is the recv datatype id, see decode_datatype()
640   */
641   double clock = smpi_process_simulated_elapsed();
642   int comm_size = MPI_COMM_WORLD->size();
643   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
644   int send_size = parse_double(action[2]);
645   int disps[comm_size];
646   int recvcounts[comm_size];
647   int recv_sum=0;
648
649   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
650   if(action[4+comm_size] && action[5+comm_size]) {
651     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
652     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
653   } else
654     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
655
656   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
657   void *recv = nullptr;
658   for(int i=0;i<comm_size;i++) {
659     recvcounts[i] = atoi(action[i+3]);
660     recv_sum=recv_sum+recvcounts[i];
661     disps[i]=0;
662   }
663
664   int root=atoi(action[3+comm_size]);
665   int rank = MPI_COMM_WORLD->rank();
666
667   if(rank==root)
668     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
669
670   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
671   extra->type = TRACING_GATHERV;
672   extra->send_size = send_size;
673   extra->recvcounts= xbt_new(int,comm_size);
674   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
675     extra->recvcounts[i] = recvcounts[i];
676   extra->root = root;
677   extra->num_processes = comm_size;
678   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
679   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
680
681   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
682
683   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
684
685   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
686   log_timed_action (action, clock);
687 }
688
689 static void action_reducescatter(const char *const *action) {
690  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
691       0 reduceScatter 275427 275427 275427 204020 11346849 0
692     where:
693       1) The first four values after the name of the action declare the recvcounts array
694       2) The value 11346849 is the amount of instructions
695       3) The last value corresponds to the datatype, see decode_datatype().
696 */
697   double clock = smpi_process_simulated_elapsed();
698   int comm_size = MPI_COMM_WORLD->size();
699   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
700   int comp_size = parse_double(action[2+comm_size]);
701   int recvcounts[comm_size];
702   int rank = smpi_process_index();
703   int size = 0;
704   if(action[3+comm_size])
705     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
706   else
707     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
708
709   for(int i=0;i<comm_size;i++) {
710     recvcounts[i] = atoi(action[i+2]);
711     size+=recvcounts[i];
712   }
713
714   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
715   extra->type = TRACING_REDUCE_SCATTER;
716   extra->send_size = 0;
717   extra->recvcounts= xbt_new(int, comm_size);
718   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
719     extra->recvcounts[i] = recvcounts[i];
720   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
721   extra->comp_size = comp_size;
722   extra->num_processes = comm_size;
723
724   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
725
726   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
727   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
728
729   mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
730   smpi_execute_flops(comp_size);
731
732   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
733   log_timed_action (action, clock);
734 }
735
736 static void action_allgather(const char *const *action) {
737   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
738         0 allGather 275427 275427
739     where:
740         1) 275427 is the sendcount
741         2) 275427 is the recvcount
742         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
743   */
744   double clock = smpi_process_simulated_elapsed();
745
746   CHECK_ACTION_PARAMS(action, 2, 2)
747   int sendcount=atoi(action[2]); 
748   int recvcount=atoi(action[3]); 
749
750   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
751
752   if(action[4] && action[5]) {
753     MPI_CURRENT_TYPE = decode_datatype(action[4]);
754     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
755   } else
756     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
757
758   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
759   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
760
761   int rank = smpi_process_index();
762   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
763   extra->type = TRACING_ALLGATHER;
764   extra->send_size = sendcount;
765   extra->recv_size= recvcount;
766   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
767   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
768   extra->num_processes = MPI_COMM_WORLD->size();
769
770   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
771
772   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
773
774   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
775   log_timed_action (action, clock);
776 }
777
778 static void action_allgatherv(const char *const *action) {
779   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
780         0 allGatherV 275427 275427 275427 275427 204020
781      where:
782         1) 275427 is the sendcount
783         2) The next four elements declare the recvcounts array
784         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
785   */
786   double clock = smpi_process_simulated_elapsed();
787
788   int comm_size = MPI_COMM_WORLD->size();
789   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
790   int sendcount=atoi(action[2]);
791   int recvcounts[comm_size];
792   int disps[comm_size];
793   int recv_sum=0;
794   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
795
796   if(action[3+comm_size] && action[4+comm_size]) {
797     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
798     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
799   } else
800     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
801
802   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
803
804   for(int i=0;i<comm_size;i++) {
805     recvcounts[i] = atoi(action[i+3]);
806     recv_sum=recv_sum+recvcounts[i];
807     disps[i] = 0;
808   }
809   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
810
811   int rank = smpi_process_index();
812   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
813   extra->type = TRACING_ALLGATHERV;
814   extra->send_size = sendcount;
815   extra->recvcounts= xbt_new(int, comm_size);
816   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
817     extra->recvcounts[i] = recvcounts[i];
818   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
819   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
820   extra->num_processes = comm_size;
821
822   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
823
824   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
825                           MPI_COMM_WORLD);
826
827   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
828   log_timed_action (action, clock);
829 }
830
831 static void action_allToAllv(const char *const *action) {
832   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
833         0 allToAllV 100 1 7 10 12 100 1 70 10 5
834      where:
835         1) 100 is the size of the send buffer *sizeof(int),
836         2) 1 7 10 12 is the sendcounts array
837         3) 100*sizeof(int) is the size of the receiver buffer
838         4)  1 70 10 5 is the recvcounts array
839   */
840   double clock = smpi_process_simulated_elapsed();
841
842   int comm_size = MPI_COMM_WORLD->size();
843   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
844   int sendcounts[comm_size];
845   int recvcounts[comm_size];
846   int senddisps[comm_size];
847   int recvdisps[comm_size];
848
849   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
850
851   int send_buf_size=parse_double(action[2]);
852   int recv_buf_size=parse_double(action[3+comm_size]);
853   if(action[4+2*comm_size] && action[5+2*comm_size]) {
854     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
855     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
856   }
857   else
858     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
859
860   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
861   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
862
863   for(int i=0;i<comm_size;i++) {
864     sendcounts[i] = atoi(action[i+3]);
865     recvcounts[i] = atoi(action[i+4+comm_size]);
866     senddisps[i] = 0;
867     recvdisps[i] = 0;
868   }
869
870   int rank = smpi_process_index();
871   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
872   extra->type = TRACING_ALLTOALLV;
873   extra->recvcounts= xbt_new(int, comm_size);
874   extra->sendcounts= xbt_new(int, comm_size);
875   extra->num_processes = comm_size;
876
877   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
878     extra->send_size += sendcounts[i];
879     extra->sendcounts[i] = sendcounts[i];
880     extra->recv_size += recvcounts[i];
881     extra->recvcounts[i] = recvcounts[i];
882   }
883   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
884   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
885
886   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
887
888   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
889                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
890
891   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
892   log_timed_action (action, clock);
893 }
894
895 void smpi_replay_run(int *argc, char***argv){
896   /* First initializes everything */
897   smpi_process_init(argc, argv);
898   smpi_process_mark_as_initialized();
899   smpi_process_set_replaying(true);
900
901   int rank = smpi_process_index();
902   TRACE_smpi_init(rank);
903   TRACE_smpi_computing_init(rank);
904   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
905   extra->type = TRACING_INIT;
906   char *operation =bprintf("%s_init",__FUNCTION__);
907   TRACE_smpi_collective_in(rank, -1, operation, extra);
908   TRACE_smpi_collective_out(rank, -1, operation);
909   xbt_free(operation);
910   _xbt_replay_action_init();
911   xbt_replay_action_register("init",       action_init);
912   xbt_replay_action_register("finalize",   action_finalize);
913   xbt_replay_action_register("comm_size",  action_comm_size);
914   xbt_replay_action_register("comm_split", action_comm_split);
915   xbt_replay_action_register("comm_dup",   action_comm_dup);
916   xbt_replay_action_register("send",       action_send);
917   xbt_replay_action_register("Isend",      action_Isend);
918   xbt_replay_action_register("recv",       action_recv);
919   xbt_replay_action_register("Irecv",      action_Irecv);
920   xbt_replay_action_register("test",       action_test);
921   xbt_replay_action_register("wait",       action_wait);
922   xbt_replay_action_register("waitAll",    action_waitall);
923   xbt_replay_action_register("barrier",    action_barrier);
924   xbt_replay_action_register("bcast",      action_bcast);
925   xbt_replay_action_register("reduce",     action_reduce);
926   xbt_replay_action_register("allReduce",  action_allReduce);
927   xbt_replay_action_register("allToAll",   action_allToAll);
928   xbt_replay_action_register("allToAllV",  action_allToAllv);
929   xbt_replay_action_register("gather",  action_gather);
930   xbt_replay_action_register("gatherV",  action_gatherv);
931   xbt_replay_action_register("allGather",  action_allgather);
932   xbt_replay_action_register("allGatherV",  action_allgatherv);
933   xbt_replay_action_register("reduceScatter",  action_reducescatter);
934   xbt_replay_action_register("compute",    action_compute);
935
936   //if we have a delayed start, sleep here.
937   if(*argc>2){
938     char *endptr;
939     double value = strtod((*argv)[2], &endptr);
940     if (*endptr != '\0')
941       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
942     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
943     smpi_execute_flops(value);
944   } else {
945     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
946     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
947     smpi_execute_flops(0.0);
948   }
949
950   /* Actually run the replay */
951   xbt_replay_action_runner(*argc, *argv);
952
953   /* and now, finalize everything */
954   /* One active process will stop. Decrease the counter*/
955   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
956   if (!get_reqq_self()->empty()){
957     unsigned int count_requests=get_reqq_self()->size();
958     MPI_Request requests[count_requests];
959     MPI_Status status[count_requests];
960     unsigned int i=0;
961
962     for (auto req: *get_reqq_self()){
963       requests[i] = req;
964       i++;
965     }
966     Request::waitall(count_requests, requests, status);
967   }
968   delete get_reqq_self();
969   active_processes--;
970
971   if(active_processes==0){
972     /* Last process alive speaking: end the simulated timer */
973     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
974     _xbt_replay_action_exit();
975     xbt_free(sendbuffer);
976     xbt_free(recvbuffer);
977   }
978
979   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
980   extra_fin->type = TRACING_FINALIZE;
981   operation =bprintf("%s_finalize",__FUNCTION__);
982   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
983
984   smpi_process_finalize();
985
986   TRACE_smpi_collective_out(rank, -1, operation);
987   TRACE_smpi_finalize(smpi_process_index());
988   smpi_process_destroy();
989   xbt_free(operation);
990 }