Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f3f7ff7be7eeeba3d700251e5c0b5e20ccbe5671
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
9 #include <vector>
10
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
12
13 using namespace simgrid::smpi;
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
20
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
23
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
28
29 static void log_timed_action (const char *const *action, double clock){
30   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31     char *name = xbt_str_join_array(action, " ");
32     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
33     xbt_free(name);
34   }
35 }
36
37 static std::vector<MPI_Request>* get_reqq_self()
38 {
39   return reqq.at(smpi_process()->index());
40 }
41
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
43 {
44    reqq.insert({smpi_process()->index(), mpi_request});
45 }
46
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
49 {
50   if (!smpi_process()->replaying())
51     return xbt_malloc(size);
52   if (sendbuffer_size<size){
53     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
54     sendbuffer_size=size;
55   }
56   return sendbuffer;
57 }
58
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61   if (!smpi_process()->replaying())
62     return xbt_malloc(size);
63   if (recvbuffer_size<size){
64     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
65     recvbuffer_size=size;
66   }
67   return recvbuffer;
68 }
69
70 void smpi_free_tmp_buffer(void* buf){
71   if (!smpi_process()->replaying())
72     xbt_free(buf);
73 }
74
75 /* Helper function */
76 static double parse_double(const char *string)
77 {
78   char *endptr;
79   double value = strtod(string, &endptr);
80   if (*endptr != '\0')
81     THROWF(unknown_error, 0, "%s is not a double", string);
82   return value;
83 }
84
85 static MPI_Datatype decode_datatype(const char *const action)
86 {
87   switch(atoi(action)) {
88     case 0:
89       MPI_CURRENT_TYPE=MPI_DOUBLE;
90       break;
91     case 1:
92       MPI_CURRENT_TYPE=MPI_INT;
93       break;
94     case 2:
95       MPI_CURRENT_TYPE=MPI_CHAR;
96       break;
97     case 3:
98       MPI_CURRENT_TYPE=MPI_SHORT;
99       break;
100     case 4:
101       MPI_CURRENT_TYPE=MPI_LONG;
102       break;
103     case 5:
104       MPI_CURRENT_TYPE=MPI_FLOAT;
105       break;
106     case 6:
107       MPI_CURRENT_TYPE=MPI_BYTE;
108       break;
109     default:
110       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
111   }
112    return MPI_CURRENT_TYPE;
113 }
114
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
116 {
117   //default type for output is set to MPI_BYTE
118   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119   if(known!=nullptr)
120     *known=1;
121   if (datatype==MPI_BYTE)
122       return "";
123   if(datatype==MPI_DOUBLE)
124       return "0";
125   if(datatype==MPI_INT)
126       return "1";
127   if(datatype==MPI_CHAR)
128       return "2";
129   if(datatype==MPI_SHORT)
130       return "3";
131   if(datatype==MPI_LONG)
132     return "4";
133   if(datatype==MPI_FLOAT)
134       return "5";
135   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136   if(known!=nullptr)
137     *known=0;
138   // default - not implemented.
139   // do not warn here as we pass in this function even for other trace formats
140   return "-1";
141 }
142
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144     int i=0;\
145     while(action[i]!=nullptr)\
146      i++;\
147     if(i<mandatory+2)                                           \
148     THROWF(arg_error, 0, "%s replay failed.\n" \
149           "%d items were given on the line. First two should be process_id and action.  " \
150           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152   }
153
154 static void action_init(const char *const *action)
155 {
156   XBT_DEBUG("Initialize the counters");
157   CHECK_ACTION_PARAMS(action, 0, 1)
158   if(action[2]) 
159     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
160   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
161
162   /* start a simulated timer */
163   smpi_process()->simulated_start();
164   /*initialize the number of active processes */
165   active_processes = smpi_process_count();
166
167   set_reqq_self(new std::vector<MPI_Request>);
168 }
169
170 static void action_finalize(const char *const *action)
171 {
172   /* Nothing to do */
173 }
174
175 static void action_comm_size(const char *const *action)
176 {
177   communicator_size = parse_double(action[2]);
178   log_timed_action (action, smpi_process()->simulated_elapsed());
179 }
180
181 static void action_comm_split(const char *const *action)
182 {
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_dup(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_compute(const char *const *action)
192 {
193   CHECK_ACTION_PARAMS(action, 1, 0)
194   double clock = smpi_process()->simulated_elapsed();
195   double flops= parse_double(action[2]);
196   int rank = smpi_process()->index();
197   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198   extra->type=TRACING_COMPUTING;
199   extra->comp_size=flops;
200   TRACE_smpi_computing_in(rank, extra);
201
202   smpi_execute_flops(flops);
203
204   TRACE_smpi_computing_out(rank);
205   log_timed_action (action, clock);
206 }
207
208 static void action_send(const char *const *action)
209 {
210   CHECK_ACTION_PARAMS(action, 2, 1)
211   int to = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process()->simulated_elapsed();
214
215   if(action[4])
216     MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else
218     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
219
220   int rank = smpi_process()->index();
221
222   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
223   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224   extra->type = TRACING_SEND;
225   extra->send_size = size;
226   extra->src = rank;
227   extra->dst = dst_traced;
228   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230   if (!TRACE_smpi_view_internals())
231     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
232
233   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
234
235   log_timed_action (action, clock);
236
237   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
238 }
239
240 static void action_Isend(const char *const *action)
241 {
242   CHECK_ACTION_PARAMS(action, 2, 1)
243   int to = atoi(action[2]);
244   double size=parse_double(action[3]);
245   double clock = smpi_process()->simulated_elapsed();
246
247   if(action[4]) 
248     MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   else 
250     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251
252   int rank = smpi_process()->index();
253   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
254   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255   extra->type = TRACING_ISEND;
256   extra->send_size = size;
257   extra->src = rank;
258   extra->dst = dst_traced;
259   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261   if (!TRACE_smpi_view_internals())
262     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
263
264   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
265
266   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
267
268   get_reqq_self()->push_back(request);
269
270   log_timed_action (action, clock);
271 }
272
273 static void action_recv(const char *const *action) {
274   CHECK_ACTION_PARAMS(action, 2, 1)
275   int from = atoi(action[2]);
276   double size=parse_double(action[3]);
277   double clock = smpi_process()->simulated_elapsed();
278   MPI_Status status;
279
280   if(action[4]) 
281     MPI_CURRENT_TYPE=decode_datatype(action[4]);
282   else 
283     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
284
285   int rank = smpi_process()->index();
286   int src_traced = MPI_COMM_WORLD->group()->rank(from);
287
288   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289   extra->type = TRACING_RECV;
290   extra->send_size = size;
291   extra->src = src_traced;
292   extra->dst = rank;
293   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
294   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
295
296   //unknown size from the receiver point of view
297   if(size<=0.0){
298     Request::probe(from, 0, MPI_COMM_WORLD, &status);
299     size=status.count;
300   }
301
302   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
303
304   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
305   if (!TRACE_smpi_view_internals()) {
306     TRACE_smpi_recv(rank, src_traced, rank, 0);
307   }
308
309   log_timed_action (action, clock);
310 }
311
312 static void action_Irecv(const char *const *action)
313 {
314   CHECK_ACTION_PARAMS(action, 2, 1)
315   int from = atoi(action[2]);
316   double size=parse_double(action[3]);
317   double clock = smpi_process()->simulated_elapsed();
318
319   if(action[4]) 
320     MPI_CURRENT_TYPE=decode_datatype(action[4]);
321   else 
322     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
323
324   int rank = smpi_process()->index();
325   int src_traced = MPI_COMM_WORLD->group()->rank(from);
326   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
327   extra->type = TRACING_IRECV;
328   extra->send_size = size;
329   extra->src = src_traced;
330   extra->dst = rank;
331   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
332   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
333   MPI_Status status;
334   //unknow size from the receiver pov
335   if(size<=0.0){
336       Request::probe(from, 0, MPI_COMM_WORLD, &status);
337       size=status.count;
338   }
339
340   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
341
342   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
343   get_reqq_self()->push_back(request);
344
345   log_timed_action (action, clock);
346 }
347
348 static void action_test(const char *const *action){
349   CHECK_ACTION_PARAMS(action, 0, 0)
350   double clock = smpi_process()->simulated_elapsed();
351   MPI_Status status;
352
353   MPI_Request request = get_reqq_self()->back();
354   get_reqq_self()->pop_back();
355   //if request is null here, this may mean that a previous test has succeeded 
356   //Different times in traced application and replayed version may lead to this 
357   //In this case, ignore the extra calls.
358   if(request!=nullptr){
359     int rank = smpi_process()->index();
360     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361     extra->type=TRACING_TEST;
362     TRACE_smpi_testing_in(rank, extra);
363
364     int flag = Request::test(&request, &status);
365
366     XBT_DEBUG("MPI_Test result: %d", flag);
367     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
368     get_reqq_self()->push_back(request);
369
370     TRACE_smpi_testing_out(rank);
371   }
372   log_timed_action (action, clock);
373 }
374
375 static void action_wait(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0)
377   double clock = smpi_process()->simulated_elapsed();
378   MPI_Status status;
379
380   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
381       xbt_str_join_array(action," "));
382   MPI_Request request = get_reqq_self()->back();
383   get_reqq_self()->pop_back();
384
385   if (request==nullptr){
386     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
387     return;
388   }
389
390   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
391
392   MPI_Group group = request->comm()->group();
393   int src_traced = group->rank(request->src());
394   int dst_traced = group->rank(request->dst());
395   int is_wait_for_receive = (request->flags() & RECV);
396   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
397   extra->type = TRACING_WAIT;
398   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
399
400   Request::wait(&request, &status);
401
402   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
403   if (is_wait_for_receive)
404     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
405   log_timed_action (action, clock);
406 }
407
408 static void action_waitall(const char *const *action){
409   CHECK_ACTION_PARAMS(action, 0, 0)
410   double clock = smpi_process()->simulated_elapsed();
411   unsigned int count_requests=get_reqq_self()->size();
412
413   if (count_requests>0) {
414     MPI_Status status[count_requests];
415
416    int rank_traced = smpi_process()->index();
417    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418    extra->type = TRACING_WAITALL;
419    extra->send_size=count_requests;
420    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
421    int recvs_snd[count_requests];
422    int recvs_rcv[count_requests];
423    unsigned int i=0;
424    for (auto req : *(get_reqq_self())){
425      if (req && (req->flags () & RECV)){
426        recvs_snd[i]=req->src();
427        recvs_rcv[i]=req->dst();
428      }else
429        recvs_snd[i]=-100;
430      i++;
431    }
432    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
433
434    for (i=0; i<count_requests;i++){
435      if (recvs_snd[i]!=-100)
436        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
437    }
438    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
439   }
440   log_timed_action (action, clock);
441 }
442
443 static void action_barrier(const char *const *action){
444   double clock = smpi_process()->simulated_elapsed();
445   int rank = smpi_process()->index();
446   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
447   extra->type = TRACING_BARRIER;
448   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
449
450   Colls::barrier(MPI_COMM_WORLD);
451
452   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
453   log_timed_action (action, clock);
454 }
455
456 static void action_bcast(const char *const *action)
457 {
458   CHECK_ACTION_PARAMS(action, 1, 2)
459   double size = parse_double(action[2]);
460   double clock = smpi_process()->simulated_elapsed();
461   int root=0;
462   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
463   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
464
465   if(action[3]) {
466     root= atoi(action[3]);
467     if(action[4])
468       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
469   }
470
471   int rank = smpi_process()->index();
472   int root_traced = MPI_COMM_WORLD->group()->index(root);
473
474   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475   extra->type = TRACING_BCAST;
476   extra->send_size = size;
477   extra->root = root_traced;
478   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
479   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
481
482   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
483
484   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
485   log_timed_action (action, clock);
486 }
487
488 static void action_reduce(const char *const *action)
489 {
490   CHECK_ACTION_PARAMS(action, 2, 2)
491   double comm_size = parse_double(action[2]);
492   double comp_size = parse_double(action[3]);
493   double clock = smpi_process()->simulated_elapsed();
494   int root=0;
495   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
496
497   if(action[4]) {
498     root= atoi(action[4]);
499     if(action[5])
500       MPI_CURRENT_TYPE=decode_datatype(action[5]);
501   }
502
503   int rank = smpi_process()->index();
504   int root_traced = MPI_COMM_WORLD->group()->rank(root);
505   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
506   extra->type = TRACING_REDUCE;
507   extra->send_size = comm_size;
508   extra->comp_size = comp_size;
509   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
510   extra->root = root_traced;
511
512   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
513
514   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
515   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
516   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
517   smpi_execute_flops(comp_size);
518
519   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
520   log_timed_action (action, clock);
521 }
522
523 static void action_allReduce(const char *const *action) {
524   CHECK_ACTION_PARAMS(action, 2, 1)
525   double comm_size = parse_double(action[2]);
526   double comp_size = parse_double(action[3]);
527
528   if(action[4])
529     MPI_CURRENT_TYPE=decode_datatype(action[4]);
530   else
531     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
532
533   double clock = smpi_process()->simulated_elapsed();
534   int rank = smpi_process()->index();
535   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
536   extra->type = TRACING_ALLREDUCE;
537   extra->send_size = comm_size;
538   extra->comp_size = comp_size;
539   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
540   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
541
542   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
543   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
544   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
545   smpi_execute_flops(comp_size);
546
547   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
548   log_timed_action (action, clock);
549 }
550
551 static void action_allToAll(const char *const *action) {
552   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
553   double clock = smpi_process()->simulated_elapsed();
554   int comm_size = MPI_COMM_WORLD->size();
555   int send_size = parse_double(action[2]);
556   int recv_size = parse_double(action[3]);
557   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
558
559   if(action[4] && action[5]) {
560     MPI_CURRENT_TYPE=decode_datatype(action[4]);
561     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
562   }
563   else
564     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
565
566   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
567   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
568
569   int rank = smpi_process()->index();
570   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
571   extra->type = TRACING_ALLTOALL;
572   extra->send_size = send_size;
573   extra->recv_size = recv_size;
574   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
575   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
576
577   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
578
579   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
580
581   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
582   log_timed_action (action, clock);
583 }
584
585 static void action_gather(const char *const *action) {
586   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
587         0 gather 68 68 0 0 0
588       where:
589         1) 68 is the sendcounts
590         2) 68 is the recvcounts
591         3) 0 is the root node
592         4) 0 is the send datatype id, see decode_datatype()
593         5) 0 is the recv datatype id, see decode_datatype()
594   */
595   CHECK_ACTION_PARAMS(action, 2, 3)
596   double clock = smpi_process()->simulated_elapsed();
597   int comm_size = MPI_COMM_WORLD->size();
598   int send_size = parse_double(action[2]);
599   int recv_size = parse_double(action[3]);
600   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601   if(action[4] && action[5]) {
602     MPI_CURRENT_TYPE=decode_datatype(action[5]);
603     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
604   } else {
605     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
606   }
607   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
608   void *recv = nullptr;
609   int root=0;
610   if(action[4])
611     root=atoi(action[4]);
612   int rank = MPI_COMM_WORLD->rank();
613
614   if(rank==root)
615     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
616
617   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
618   extra->type = TRACING_GATHER;
619   extra->send_size = send_size;
620   extra->recv_size = recv_size;
621   extra->root = root;
622   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
623   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
624
625   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
626
627   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
628
629   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
630   log_timed_action (action, clock);
631 }
632
633 static void action_gatherv(const char *const *action) {
634   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
635        0 gather 68 68 10 10 10 0 0 0
636      where:
637        1) 68 is the sendcount
638        2) 68 10 10 10 is the recvcounts
639        3) 0 is the root node
640        4) 0 is the send datatype id, see decode_datatype()
641        5) 0 is the recv datatype id, see decode_datatype()
642   */
643   double clock = smpi_process()->simulated_elapsed();
644   int comm_size = MPI_COMM_WORLD->size();
645   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
646   int send_size = parse_double(action[2]);
647   int disps[comm_size];
648   int recvcounts[comm_size];
649   int recv_sum=0;
650
651   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
652   if(action[4+comm_size] && action[5+comm_size]) {
653     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
654     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
655   } else
656     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
657
658   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
659   void *recv = nullptr;
660   for(int i=0;i<comm_size;i++) {
661     recvcounts[i] = atoi(action[i+3]);
662     recv_sum=recv_sum+recvcounts[i];
663     disps[i]=0;
664   }
665
666   int root=atoi(action[3+comm_size]);
667   int rank = MPI_COMM_WORLD->rank();
668
669   if(rank==root)
670     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
671
672   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673   extra->type = TRACING_GATHERV;
674   extra->send_size = send_size;
675   extra->recvcounts= xbt_new(int,comm_size);
676   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
677     extra->recvcounts[i] = recvcounts[i];
678   extra->root = root;
679   extra->num_processes = comm_size;
680   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
681   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
682
683   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
684
685   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
686
687   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
688   log_timed_action (action, clock);
689 }
690
691 static void action_reducescatter(const char *const *action) {
692  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
693       0 reduceScatter 275427 275427 275427 204020 11346849 0
694     where:
695       1) The first four values after the name of the action declare the recvcounts array
696       2) The value 11346849 is the amount of instructions
697       3) The last value corresponds to the datatype, see decode_datatype().
698 */
699   double clock = smpi_process()->simulated_elapsed();
700   int comm_size = MPI_COMM_WORLD->size();
701   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
702   int comp_size = parse_double(action[2+comm_size]);
703   int recvcounts[comm_size];
704   int rank = smpi_process()->index();
705   int size = 0;
706   if(action[3+comm_size])
707     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
708   else
709     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
710
711   for(int i=0;i<comm_size;i++) {
712     recvcounts[i] = atoi(action[i+2]);
713     size+=recvcounts[i];
714   }
715
716   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
717   extra->type = TRACING_REDUCE_SCATTER;
718   extra->send_size = 0;
719   extra->recvcounts= xbt_new(int, comm_size);
720   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
721     extra->recvcounts[i] = recvcounts[i];
722   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
723   extra->comp_size = comp_size;
724   extra->num_processes = comm_size;
725
726   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
727
728   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
729   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
730
731   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
732   smpi_execute_flops(comp_size);
733
734   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
735   log_timed_action (action, clock);
736 }
737
738 static void action_allgather(const char *const *action) {
739   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
740         0 allGather 275427 275427
741     where:
742         1) 275427 is the sendcount
743         2) 275427 is the recvcount
744         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
745   */
746   double clock = smpi_process()->simulated_elapsed();
747
748   CHECK_ACTION_PARAMS(action, 2, 2)
749   int sendcount=atoi(action[2]); 
750   int recvcount=atoi(action[3]); 
751
752   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
753
754   if(action[4] && action[5]) {
755     MPI_CURRENT_TYPE = decode_datatype(action[4]);
756     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
757   } else
758     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
759
760   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
761   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
762
763   int rank = smpi_process()->index();
764   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765   extra->type = TRACING_ALLGATHER;
766   extra->send_size = sendcount;
767   extra->recv_size= recvcount;
768   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
769   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
770   extra->num_processes = MPI_COMM_WORLD->size();
771
772   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
773
774   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
775
776   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
777   log_timed_action (action, clock);
778 }
779
780 static void action_allgatherv(const char *const *action) {
781   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
782         0 allGatherV 275427 275427 275427 275427 204020
783      where:
784         1) 275427 is the sendcount
785         2) The next four elements declare the recvcounts array
786         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
787   */
788   double clock = smpi_process()->simulated_elapsed();
789
790   int comm_size = MPI_COMM_WORLD->size();
791   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
792   int sendcount=atoi(action[2]);
793   int recvcounts[comm_size];
794   int disps[comm_size];
795   int recv_sum=0;
796   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
797
798   if(action[3+comm_size] && action[4+comm_size]) {
799     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
801   } else
802     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
803
804   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
805
806   for(int i=0;i<comm_size;i++) {
807     recvcounts[i] = atoi(action[i+3]);
808     recv_sum=recv_sum+recvcounts[i];
809     disps[i] = 0;
810   }
811   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
812
813   int rank = smpi_process()->index();
814   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
815   extra->type = TRACING_ALLGATHERV;
816   extra->send_size = sendcount;
817   extra->recvcounts= xbt_new(int, comm_size);
818   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
819     extra->recvcounts[i] = recvcounts[i];
820   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
821   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
822   extra->num_processes = comm_size;
823
824   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
825
826   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
827                           MPI_COMM_WORLD);
828
829   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
830   log_timed_action (action, clock);
831 }
832
833 static void action_allToAllv(const char *const *action) {
834   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
835         0 allToAllV 100 1 7 10 12 100 1 70 10 5
836      where:
837         1) 100 is the size of the send buffer *sizeof(int),
838         2) 1 7 10 12 is the sendcounts array
839         3) 100*sizeof(int) is the size of the receiver buffer
840         4)  1 70 10 5 is the recvcounts array
841   */
842   double clock = smpi_process()->simulated_elapsed();
843
844   int comm_size = MPI_COMM_WORLD->size();
845   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
846   int sendcounts[comm_size];
847   int recvcounts[comm_size];
848   int senddisps[comm_size];
849   int recvdisps[comm_size];
850
851   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
852
853   int send_buf_size=parse_double(action[2]);
854   int recv_buf_size=parse_double(action[3+comm_size]);
855   if(action[4+2*comm_size] && action[5+2*comm_size]) {
856     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
857     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
858   }
859   else
860     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
861
862   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
863   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
864
865   for(int i=0;i<comm_size;i++) {
866     sendcounts[i] = atoi(action[i+3]);
867     recvcounts[i] = atoi(action[i+4+comm_size]);
868     senddisps[i] = 0;
869     recvdisps[i] = 0;
870   }
871
872   int rank = smpi_process()->index();
873   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
874   extra->type = TRACING_ALLTOALLV;
875   extra->recvcounts= xbt_new(int, comm_size);
876   extra->sendcounts= xbt_new(int, comm_size);
877   extra->num_processes = comm_size;
878
879   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
880     extra->send_size += sendcounts[i];
881     extra->sendcounts[i] = sendcounts[i];
882     extra->recv_size += recvcounts[i];
883     extra->recvcounts[i] = recvcounts[i];
884   }
885   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
886   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
887
888   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
889
890   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
891                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
892
893   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894   log_timed_action (action, clock);
895 }
896
897 void smpi_replay_run(int *argc, char***argv){
898   /* First initializes everything */
899   Process::init(argc, argv);
900   smpi_process()->mark_as_initialized();
901   smpi_process()->set_replaying(true);
902
903   int rank = smpi_process()->index();
904   TRACE_smpi_init(rank);
905   TRACE_smpi_computing_init(rank);
906   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
907   extra->type = TRACING_INIT;
908   char *operation =bprintf("%s_init",__FUNCTION__);
909   TRACE_smpi_collective_in(rank, -1, operation, extra);
910   TRACE_smpi_collective_out(rank, -1, operation);
911   xbt_free(operation);
912   xbt_replay_action_register("init",       action_init);
913   xbt_replay_action_register("finalize",   action_finalize);
914   xbt_replay_action_register("comm_size",  action_comm_size);
915   xbt_replay_action_register("comm_split", action_comm_split);
916   xbt_replay_action_register("comm_dup",   action_comm_dup);
917   xbt_replay_action_register("send",       action_send);
918   xbt_replay_action_register("Isend",      action_Isend);
919   xbt_replay_action_register("recv",       action_recv);
920   xbt_replay_action_register("Irecv",      action_Irecv);
921   xbt_replay_action_register("test",       action_test);
922   xbt_replay_action_register("wait",       action_wait);
923   xbt_replay_action_register("waitAll",    action_waitall);
924   xbt_replay_action_register("barrier",    action_barrier);
925   xbt_replay_action_register("bcast",      action_bcast);
926   xbt_replay_action_register("reduce",     action_reduce);
927   xbt_replay_action_register("allReduce",  action_allReduce);
928   xbt_replay_action_register("allToAll",   action_allToAll);
929   xbt_replay_action_register("allToAllV",  action_allToAllv);
930   xbt_replay_action_register("gather",  action_gather);
931   xbt_replay_action_register("gatherV",  action_gatherv);
932   xbt_replay_action_register("allGather",  action_allgather);
933   xbt_replay_action_register("allGatherV",  action_allgatherv);
934   xbt_replay_action_register("reduceScatter",  action_reducescatter);
935   xbt_replay_action_register("compute",    action_compute);
936
937   //if we have a delayed start, sleep here.
938   if(*argc>2){
939     char *endptr;
940     double value = strtod((*argv)[2], &endptr);
941     if (*endptr != '\0')
942       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
943     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
944     smpi_execute_flops(value);
945   } else {
946     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
947     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
948     smpi_execute_flops(0.0);
949   }
950
951   /* Actually run the replay */
952   simgrid::xbt::replay_runner(*argc, *argv);
953
954   /* and now, finalize everything */
955   /* One active process will stop. Decrease the counter*/
956   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
957   if (!get_reqq_self()->empty()){
958     unsigned int count_requests=get_reqq_self()->size();
959     MPI_Request requests[count_requests];
960     MPI_Status status[count_requests];
961     unsigned int i=0;
962
963     for (auto req: *get_reqq_self()){
964       requests[i] = req;
965       i++;
966     }
967     Request::waitall(count_requests, requests, status);
968   }
969   delete get_reqq_self();
970   active_processes--;
971
972   if(active_processes==0){
973     /* Last process alive speaking: end the simulated timer */
974     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
975     xbt_free(sendbuffer);
976     xbt_free(recvbuffer);
977   }
978
979   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
980   extra_fin->type = TRACING_FINALIZE;
981   operation =bprintf("%s_finalize",__FUNCTION__);
982   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
983
984   smpi_process()->finalize();
985
986   TRACE_smpi_collective_out(rank, -1, operation);
987   TRACE_smpi_finalize(smpi_process()->index());
988   smpi_process()->destroy();
989   xbt_free(operation);
990 }