Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
loudly fail when asked to replay a non-existing file
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.hpp"
8 #include <unordered_map>
9 #include <vector>
10
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
26
27 static void log_timed_action (const char *const *action, double clock){
28   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29     char *name = xbt_str_join_array(action, " ");
30     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
31     xbt_free(name);
32   }
33 }
34
35 static std::vector<MPI_Request>* get_reqq_self()
36 {
37   return reqq.at(smpi_process()->index());
38 }
39
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
41 {
42    reqq.insert({smpi_process()->index(), mpi_request});
43 }
44
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
47 {
48   if (!smpi_process()->replaying())
49     return xbt_malloc(size);
50   if (sendbuffer_size<size){
51     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
52     sendbuffer_size=size;
53   }
54   return sendbuffer;
55 }
56
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59   if (!smpi_process()->replaying())
60     return xbt_malloc(size);
61   if (recvbuffer_size<size){
62     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
63     recvbuffer_size=size;
64   }
65   return recvbuffer;
66 }
67
68 void smpi_free_tmp_buffer(void* buf){
69   if (!smpi_process()->replaying())
70     xbt_free(buf);
71 }
72
73 /* Helper function */
74 static double parse_double(const char *string)
75 {
76   char *endptr;
77   double value = strtod(string, &endptr);
78   if (*endptr != '\0')
79     THROWF(unknown_error, 0, "%s is not a double", string);
80   return value;
81 }
82
83 static MPI_Datatype decode_datatype(const char *const action)
84 {
85   switch(atoi(action)) {
86     case 0:
87       MPI_CURRENT_TYPE=MPI_DOUBLE;
88       break;
89     case 1:
90       MPI_CURRENT_TYPE=MPI_INT;
91       break;
92     case 2:
93       MPI_CURRENT_TYPE=MPI_CHAR;
94       break;
95     case 3:
96       MPI_CURRENT_TYPE=MPI_SHORT;
97       break;
98     case 4:
99       MPI_CURRENT_TYPE=MPI_LONG;
100       break;
101     case 5:
102       MPI_CURRENT_TYPE=MPI_FLOAT;
103       break;
104     case 6:
105       MPI_CURRENT_TYPE=MPI_BYTE;
106       break;
107     default:
108       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
109       break;
110   }
111    return MPI_CURRENT_TYPE;
112 }
113
114 const char* encode_datatype(MPI_Datatype datatype, int* known)
115 {
116   //default type for output is set to MPI_BYTE
117   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
118   if(known!=nullptr)
119     *known=1;
120   if (datatype==MPI_BYTE)
121       return "";
122   if(datatype==MPI_DOUBLE)
123       return "0";
124   if(datatype==MPI_INT)
125       return "1";
126   if(datatype==MPI_CHAR)
127       return "2";
128   if(datatype==MPI_SHORT)
129       return "3";
130   if(datatype==MPI_LONG)
131     return "4";
132   if(datatype==MPI_FLOAT)
133       return "5";
134   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
135   if(known!=nullptr)
136     *known=0;
137   // default - not implemented.
138   // do not warn here as we pass in this function even for other trace formats
139   return "-1";
140 }
141
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143     int i=0;\
144     while(action[i]!=nullptr)\
145      i++;\
146     if(i<mandatory+2)                                           \
147     THROWF(arg_error, 0, "%s replay failed.\n" \
148           "%d items were given on the line. First two should be process_id and action.  " \
149           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
151   }
152
153 namespace simgrid {
154 namespace smpi {
155
156 static void action_init(const char *const *action)
157 {
158   XBT_DEBUG("Initialize the counters");
159   CHECK_ACTION_PARAMS(action, 0, 1)
160   if(action[2]) 
161     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
162   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
163
164   /* start a simulated timer */
165   smpi_process()->simulated_start();
166   /*initialize the number of active processes */
167   active_processes = smpi_process_count();
168
169   set_reqq_self(new std::vector<MPI_Request>);
170 }
171
172 static void action_finalize(const char *const *action)
173 {
174   /* Nothing to do */
175 }
176
177 static void action_comm_size(const char *const *action)
178 {
179   communicator_size = parse_double(action[2]);
180   log_timed_action (action, smpi_process()->simulated_elapsed());
181 }
182
183 static void action_comm_split(const char *const *action)
184 {
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_dup(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_compute(const char *const *action)
194 {
195   CHECK_ACTION_PARAMS(action, 1, 0)
196   double clock = smpi_process()->simulated_elapsed();
197   double flops= parse_double(action[2]);
198   int rank = smpi_process()->index();
199   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
200   extra->type=TRACING_COMPUTING;
201   extra->comp_size=flops;
202   TRACE_smpi_computing_in(rank, extra);
203
204   smpi_execute_flops(flops);
205
206   TRACE_smpi_computing_out(rank);
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   if(action[4])
218     MPI_CURRENT_TYPE=decode_datatype(action[4]);
219   else
220     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
221
222   int rank = smpi_process()->index();
223
224   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
225   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
226   extra->type = TRACING_SEND;
227   extra->send_size = size;
228   extra->src = rank;
229   extra->dst = dst_traced;
230   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
231   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
232   if (!TRACE_smpi_view_internals())
233     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
234
235   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
236
237   log_timed_action (action, clock);
238
239   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
240 }
241
242 static void action_Isend(const char *const *action)
243 {
244   CHECK_ACTION_PARAMS(action, 2, 1)
245   int to = atoi(action[2]);
246   double size=parse_double(action[3]);
247   double clock = smpi_process()->simulated_elapsed();
248
249   if(action[4]) 
250     MPI_CURRENT_TYPE=decode_datatype(action[4]);
251   else 
252     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
253
254   int rank = smpi_process()->index();
255   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
256   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257   extra->type = TRACING_ISEND;
258   extra->send_size = size;
259   extra->src = rank;
260   extra->dst = dst_traced;
261   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
262   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263   if (!TRACE_smpi_view_internals())
264     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
265
266   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
267
268   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
269
270   get_reqq_self()->push_back(request);
271
272   log_timed_action (action, clock);
273 }
274
275 static void action_recv(const char *const *action) {
276   CHECK_ACTION_PARAMS(action, 2, 1)
277   int from = atoi(action[2]);
278   double size=parse_double(action[3]);
279   double clock = smpi_process()->simulated_elapsed();
280   MPI_Status status;
281
282   if(action[4]) 
283     MPI_CURRENT_TYPE=decode_datatype(action[4]);
284   else 
285     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286
287   int rank = smpi_process()->index();
288   int src_traced = MPI_COMM_WORLD->group()->rank(from);
289
290   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291   extra->type = TRACING_RECV;
292   extra->send_size = size;
293   extra->src = src_traced;
294   extra->dst = rank;
295   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
296   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297
298   //unknown size from the receiver point of view
299   if(size<=0.0){
300     Request::probe(from, 0, MPI_COMM_WORLD, &status);
301     size=status.count;
302   }
303
304   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
305
306   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
307   if (!TRACE_smpi_view_internals()) {
308     TRACE_smpi_recv(rank, src_traced, rank, 0);
309   }
310
311   log_timed_action (action, clock);
312 }
313
314 static void action_Irecv(const char *const *action)
315 {
316   CHECK_ACTION_PARAMS(action, 2, 1)
317   int from = atoi(action[2]);
318   double size=parse_double(action[3]);
319   double clock = smpi_process()->simulated_elapsed();
320
321   if(action[4]) 
322     MPI_CURRENT_TYPE=decode_datatype(action[4]);
323   else 
324     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
325
326   int rank = smpi_process()->index();
327   int src_traced = MPI_COMM_WORLD->group()->rank(from);
328   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
329   extra->type = TRACING_IRECV;
330   extra->send_size = size;
331   extra->src = src_traced;
332   extra->dst = rank;
333   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
334   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
335   MPI_Status status;
336   //unknow size from the receiver pov
337   if(size<=0.0){
338       Request::probe(from, 0, MPI_COMM_WORLD, &status);
339       size=status.count;
340   }
341
342   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
343
344   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
345   get_reqq_self()->push_back(request);
346
347   log_timed_action (action, clock);
348 }
349
350 static void action_test(const char *const *action){
351   CHECK_ACTION_PARAMS(action, 0, 0)
352   double clock = smpi_process()->simulated_elapsed();
353   MPI_Status status;
354
355   MPI_Request request = get_reqq_self()->back();
356   get_reqq_self()->pop_back();
357   //if request is null here, this may mean that a previous test has succeeded 
358   //Different times in traced application and replayed version may lead to this 
359   //In this case, ignore the extra calls.
360   if(request!=nullptr){
361     int rank = smpi_process()->index();
362     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363     extra->type=TRACING_TEST;
364     TRACE_smpi_testing_in(rank, extra);
365
366     int flag = Request::test(&request, &status);
367
368     XBT_DEBUG("MPI_Test result: %d", flag);
369     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370     get_reqq_self()->push_back(request);
371
372     TRACE_smpi_testing_out(rank);
373   }
374   log_timed_action (action, clock);
375 }
376
377 static void action_wait(const char *const *action){
378   CHECK_ACTION_PARAMS(action, 0, 0)
379   double clock = smpi_process()->simulated_elapsed();
380   MPI_Status status;
381
382   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   MPI_Request request = get_reqq_self()->back();
385   get_reqq_self()->pop_back();
386
387   if (request==nullptr){
388     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
389     return;
390   }
391
392   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
393
394   MPI_Group group = request->comm()->group();
395   int src_traced = group->rank(request->src());
396   int dst_traced = group->rank(request->dst());
397   int is_wait_for_receive = (request->flags() & RECV);
398   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399   extra->type = TRACING_WAIT;
400   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
401
402   Request::wait(&request, &status);
403
404   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405   if (is_wait_for_receive)
406     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407   log_timed_action (action, clock);
408 }
409
410 static void action_waitall(const char *const *action){
411   CHECK_ACTION_PARAMS(action, 0, 0)
412   double clock = smpi_process()->simulated_elapsed();
413   unsigned int count_requests=get_reqq_self()->size();
414
415   if (count_requests>0) {
416     MPI_Status status[count_requests];
417
418    int rank_traced = smpi_process()->index();
419    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420    extra->type = TRACING_WAITALL;
421    extra->send_size=count_requests;
422    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423    int recvs_snd[count_requests];
424    int recvs_rcv[count_requests];
425    unsigned int i=0;
426    for (auto req : *(get_reqq_self())){
427      if (req && (req->flags () & RECV)){
428        recvs_snd[i]=req->src();
429        recvs_rcv[i]=req->dst();
430      }else
431        recvs_snd[i]=-100;
432      i++;
433    }
434    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
435
436    for (i=0; i<count_requests;i++){
437      if (recvs_snd[i]!=-100)
438        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
439    }
440    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
441   }
442   log_timed_action (action, clock);
443 }
444
445 static void action_barrier(const char *const *action){
446   double clock = smpi_process()->simulated_elapsed();
447   int rank = smpi_process()->index();
448   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449   extra->type = TRACING_BARRIER;
450   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
451
452   Colls::barrier(MPI_COMM_WORLD);
453
454   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455   log_timed_action (action, clock);
456 }
457
458 static void action_bcast(const char *const *action)
459 {
460   CHECK_ACTION_PARAMS(action, 1, 2)
461   double size = parse_double(action[2]);
462   double clock = smpi_process()->simulated_elapsed();
463   int root=0;
464   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
466
467   if(action[3]) {
468     root= atoi(action[3]);
469     if(action[4])
470       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
471   }
472
473   int rank = smpi_process()->index();
474   int root_traced = MPI_COMM_WORLD->group()->index(root);
475
476   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477   extra->type = TRACING_BCAST;
478   extra->send_size = size;
479   extra->root = root_traced;
480   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
481   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
482   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
483
484   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
485
486   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
487   log_timed_action (action, clock);
488 }
489
490 static void action_reduce(const char *const *action)
491 {
492   CHECK_ACTION_PARAMS(action, 2, 2)
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process()->simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5])
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503   }
504
505   int rank = smpi_process()->index();
506   int root_traced = MPI_COMM_WORLD->group()->rank(root);
507   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508   extra->type = TRACING_REDUCE;
509   extra->send_size = comm_size;
510   extra->comp_size = comp_size;
511   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
512   extra->root = root_traced;
513
514   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
515
516   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
518   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519   smpi_execute_flops(comp_size);
520
521   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522   log_timed_action (action, clock);
523 }
524
525 static void action_allReduce(const char *const *action) {
526   CHECK_ACTION_PARAMS(action, 2, 1)
527   double comm_size = parse_double(action[2]);
528   double comp_size = parse_double(action[3]);
529
530   if(action[4])
531     MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else
533     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
534
535   double clock = smpi_process()->simulated_elapsed();
536   int rank = smpi_process()->index();
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
542   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
543
544   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
546   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547   smpi_execute_flops(comp_size);
548
549   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550   log_timed_action (action, clock);
551 }
552
553 static void action_allToAll(const char *const *action) {
554   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555   double clock = smpi_process()->simulated_elapsed();
556   int comm_size = MPI_COMM_WORLD->size();
557   int send_size = parse_double(action[2]);
558   int recv_size = parse_double(action[3]);
559   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
560
561   if(action[4] && action[5]) {
562     MPI_CURRENT_TYPE=decode_datatype(action[4]);
563     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
564   }
565   else
566     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
567
568   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
569   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
570
571   int rank = smpi_process()->index();
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_ALLTOALL;
574   extra->send_size = send_size;
575   extra->recv_size = recv_size;
576   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
577   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
578
579   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
580
581   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
582
583   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
584   log_timed_action (action, clock);
585 }
586
587 static void action_gather(const char *const *action) {
588   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
589         0 gather 68 68 0 0 0
590       where:
591         1) 68 is the sendcounts
592         2) 68 is the recvcounts
593         3) 0 is the root node
594         4) 0 is the send datatype id, see decode_datatype()
595         5) 0 is the recv datatype id, see decode_datatype()
596   */
597   CHECK_ACTION_PARAMS(action, 2, 3)
598   double clock = smpi_process()->simulated_elapsed();
599   int comm_size = MPI_COMM_WORLD->size();
600   int send_size = parse_double(action[2]);
601   int recv_size = parse_double(action[3]);
602   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603   if(action[4] && action[5]) {
604     MPI_CURRENT_TYPE=decode_datatype(action[5]);
605     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
606   } else {
607     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
608   }
609   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
610   void *recv = nullptr;
611   int root=0;
612   if(action[4])
613     root=atoi(action[4]);
614   int rank = MPI_COMM_WORLD->rank();
615
616   if(rank==root)
617     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
618
619   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620   extra->type = TRACING_GATHER;
621   extra->send_size = send_size;
622   extra->recv_size = recv_size;
623   extra->root = root;
624   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
625   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
626
627   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
628
629   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
630
631   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
632   log_timed_action (action, clock);
633 }
634
635 static void action_gatherv(const char *const *action) {
636   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637        0 gather 68 68 10 10 10 0 0 0
638      where:
639        1) 68 is the sendcount
640        2) 68 10 10 10 is the recvcounts
641        3) 0 is the root node
642        4) 0 is the send datatype id, see decode_datatype()
643        5) 0 is the recv datatype id, see decode_datatype()
644   */
645   double clock = smpi_process()->simulated_elapsed();
646   int comm_size = MPI_COMM_WORLD->size();
647   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648   int send_size = parse_double(action[2]);
649   int disps[comm_size];
650   int recvcounts[comm_size];
651   int recv_sum=0;
652
653   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654   if(action[4+comm_size] && action[5+comm_size]) {
655     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
657   } else
658     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
659
660   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
661   void *recv = nullptr;
662   for(int i=0;i<comm_size;i++) {
663     recvcounts[i] = atoi(action[i+3]);
664     recv_sum=recv_sum+recvcounts[i];
665     disps[i]=0;
666   }
667
668   int root=atoi(action[3+comm_size]);
669   int rank = MPI_COMM_WORLD->rank();
670
671   if(rank==root)
672     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
673
674   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675   extra->type = TRACING_GATHERV;
676   extra->send_size = send_size;
677   extra->recvcounts= xbt_new(int,comm_size);
678   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679     extra->recvcounts[i] = recvcounts[i];
680   extra->root = root;
681   extra->num_processes = comm_size;
682   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
683   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
684
685   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
686
687   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
688
689   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
690   log_timed_action (action, clock);
691 }
692
693 static void action_reducescatter(const char *const *action) {
694  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695       0 reduceScatter 275427 275427 275427 204020 11346849 0
696     where:
697       1) The first four values after the name of the action declare the recvcounts array
698       2) The value 11346849 is the amount of instructions
699       3) The last value corresponds to the datatype, see decode_datatype().
700 */
701   double clock = smpi_process()->simulated_elapsed();
702   int comm_size = MPI_COMM_WORLD->size();
703   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704   int comp_size = parse_double(action[2+comm_size]);
705   int recvcounts[comm_size];
706   int rank = smpi_process()->index();
707   int size = 0;
708   if(action[3+comm_size])
709     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
710   else
711     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
712
713   for(int i=0;i<comm_size;i++) {
714     recvcounts[i] = atoi(action[i+2]);
715     size+=recvcounts[i];
716   }
717
718   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719   extra->type = TRACING_REDUCE_SCATTER;
720   extra->send_size = 0;
721   extra->recvcounts= xbt_new(int, comm_size);
722   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723     extra->recvcounts[i] = recvcounts[i];
724   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
725   extra->comp_size = comp_size;
726   extra->num_processes = comm_size;
727
728   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
729
730   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
731   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
732
733   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734   smpi_execute_flops(comp_size);
735
736   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
737   log_timed_action (action, clock);
738 }
739
740 static void action_allgather(const char *const *action) {
741   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742         0 allGather 275427 275427
743     where:
744         1) 275427 is the sendcount
745         2) 275427 is the recvcount
746         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
747   */
748   double clock = smpi_process()->simulated_elapsed();
749
750   CHECK_ACTION_PARAMS(action, 2, 2)
751   int sendcount=atoi(action[2]); 
752   int recvcount=atoi(action[3]); 
753
754   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
755
756   if(action[4] && action[5]) {
757     MPI_CURRENT_TYPE = decode_datatype(action[4]);
758     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
759   } else
760     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
761
762   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
763   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
764
765   int rank = smpi_process()->index();
766   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767   extra->type = TRACING_ALLGATHER;
768   extra->send_size = sendcount;
769   extra->recv_size= recvcount;
770   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
771   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
772   extra->num_processes = MPI_COMM_WORLD->size();
773
774   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
775
776   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
777
778   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
779   log_timed_action (action, clock);
780 }
781
782 static void action_allgatherv(const char *const *action) {
783   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784         0 allGatherV 275427 275427 275427 275427 204020
785      where:
786         1) 275427 is the sendcount
787         2) The next four elements declare the recvcounts array
788         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
789   */
790   double clock = smpi_process()->simulated_elapsed();
791
792   int comm_size = MPI_COMM_WORLD->size();
793   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794   int sendcount=atoi(action[2]);
795   int recvcounts[comm_size];
796   int disps[comm_size];
797   int recv_sum=0;
798   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
799
800   if(action[3+comm_size] && action[4+comm_size]) {
801     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
803   } else
804     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
805
806   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
807
808   for(int i=0;i<comm_size;i++) {
809     recvcounts[i] = atoi(action[i+3]);
810     recv_sum=recv_sum+recvcounts[i];
811     disps[i] = 0;
812   }
813   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
814
815   int rank = smpi_process()->index();
816   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817   extra->type = TRACING_ALLGATHERV;
818   extra->send_size = sendcount;
819   extra->recvcounts= xbt_new(int, comm_size);
820   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821     extra->recvcounts[i] = recvcounts[i];
822   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
823   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
824   extra->num_processes = comm_size;
825
826   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
827
828   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
829                           MPI_COMM_WORLD);
830
831   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
832   log_timed_action (action, clock);
833 }
834
835 static void action_allToAllv(const char *const *action) {
836   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837         0 allToAllV 100 1 7 10 12 100 1 70 10 5
838      where:
839         1) 100 is the size of the send buffer *sizeof(int),
840         2) 1 7 10 12 is the sendcounts array
841         3) 100*sizeof(int) is the size of the receiver buffer
842         4)  1 70 10 5 is the recvcounts array
843   */
844   double clock = smpi_process()->simulated_elapsed();
845
846   int comm_size = MPI_COMM_WORLD->size();
847   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848   int sendcounts[comm_size];
849   int recvcounts[comm_size];
850   int senddisps[comm_size];
851   int recvdisps[comm_size];
852
853   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
854
855   int send_buf_size=parse_double(action[2]);
856   int recv_buf_size=parse_double(action[3+comm_size]);
857   if(action[4+2*comm_size] && action[5+2*comm_size]) {
858     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
860   }
861   else
862     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
863
864   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
865   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
866
867   for(int i=0;i<comm_size;i++) {
868     sendcounts[i] = atoi(action[i+3]);
869     recvcounts[i] = atoi(action[i+4+comm_size]);
870     senddisps[i] = 0;
871     recvdisps[i] = 0;
872   }
873
874   int rank = smpi_process()->index();
875   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876   extra->type = TRACING_ALLTOALLV;
877   extra->recvcounts= xbt_new(int, comm_size);
878   extra->sendcounts= xbt_new(int, comm_size);
879   extra->num_processes = comm_size;
880
881   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882     extra->send_size += sendcounts[i];
883     extra->sendcounts[i] = sendcounts[i];
884     extra->recv_size += recvcounts[i];
885     extra->recvcounts[i] = recvcounts[i];
886   }
887   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
888   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
889
890   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
891
892   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
894
895   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
896   log_timed_action (action, clock);
897 }
898
899 }} // namespace simgrid::smpi
900
901 void smpi_replay_run(int *argc, char***argv){
902   /* First initializes everything */
903   simgrid::smpi::Process::init(argc, argv);
904   smpi_process()->mark_as_initialized();
905   smpi_process()->set_replaying(true);
906
907   int rank = smpi_process()->index();
908   TRACE_smpi_init(rank);
909   TRACE_smpi_computing_init(rank);
910   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
911   extra->type = TRACING_INIT;
912   char *operation =bprintf("%s_init",__FUNCTION__);
913   TRACE_smpi_collective_in(rank, -1, operation, extra);
914   TRACE_smpi_collective_out(rank, -1, operation);
915   xbt_free(operation);
916   xbt_replay_action_register("init",       simgrid::smpi::action_init);
917   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
918   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
919   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
920   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
921   xbt_replay_action_register("send",       simgrid::smpi::action_send);
922   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
923   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
924   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
925   xbt_replay_action_register("test",       simgrid::smpi::action_test);
926   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
927   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
928   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
929   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
930   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
931   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
932   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
933   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
934   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
935   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
936   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
937   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
938   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
939   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
940
941   //if we have a delayed start, sleep here.
942   if(*argc>2){
943     char *endptr;
944     double value = strtod((*argv)[2], &endptr);
945     if (*endptr != '\0')
946       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
947     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
948     smpi_execute_flops(value);
949   } else {
950     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
951     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
952     smpi_execute_flops(0.0);
953   }
954
955   /* Actually run the replay */
956   simgrid::xbt::replay_runner(*argc, *argv);
957
958   /* and now, finalize everything */
959   /* One active process will stop. Decrease the counter*/
960   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
961   if (!get_reqq_self()->empty()){
962     unsigned int count_requests=get_reqq_self()->size();
963     MPI_Request requests[count_requests];
964     MPI_Status status[count_requests];
965     unsigned int i=0;
966
967     for (auto req: *get_reqq_self()){
968       requests[i] = req;
969       i++;
970     }
971     simgrid::smpi::Request::waitall(count_requests, requests, status);
972   }
973   delete get_reqq_self();
974   active_processes--;
975
976   if(active_processes==0){
977     /* Last process alive speaking: end the simulated timer */
978     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
979     xbt_free(sendbuffer);
980     xbt_free(recvbuffer);
981   }
982
983   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
984   extra_fin->type = TRACING_FINALIZE;
985   operation =bprintf("%s_finalize",__FUNCTION__);
986   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
987
988   smpi_process()->finalize();
989
990   TRACE_smpi_collective_out(rank, -1, operation);
991   TRACE_smpi_finalize(smpi_process()->index());
992   xbt_free(operation);
993 }