Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
reduce the amount of includes
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/smpi/smpi_coll.hpp"
7 #include "src/smpi/smpi_comm.hpp"
8 #include "src/smpi/smpi_datatype.hpp"
9 #include "src/smpi/smpi_group.hpp"
10 #include "src/smpi/smpi_process.hpp"
11 #include "src/smpi/smpi_request.hpp"
12 #include "xbt/replay.hpp"
13
14 #include <unordered_map>
15 #include <vector>
16
17 #define KEY_SIZE (sizeof(int) * 2 + 1)
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
20
21 int communicator_size = 0;
22 static int active_processes = 0;
23 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
24
25 MPI_Datatype MPI_DEFAULT_TYPE;
26 MPI_Datatype MPI_CURRENT_TYPE;
27
28 static int sendbuffer_size=0;
29 char* sendbuffer=nullptr;
30 static int recvbuffer_size=0;
31 char* recvbuffer=nullptr;
32
33 static void log_timed_action (const char *const *action, double clock){
34   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
35     char *name = xbt_str_join_array(action, " ");
36     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
37     xbt_free(name);
38   }
39 }
40
41 static std::vector<MPI_Request>* get_reqq_self()
42 {
43   return reqq.at(smpi_process()->index());
44 }
45
46 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
47 {
48    reqq.insert({smpi_process()->index(), mpi_request});
49 }
50
51 //allocate a single buffer for all sends, growing it if needed
52 void* smpi_get_tmp_sendbuffer(int size)
53 {
54   if (!smpi_process()->replaying())
55     return xbt_malloc(size);
56   if (sendbuffer_size<size){
57     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
58     sendbuffer_size=size;
59   }
60   return sendbuffer;
61 }
62
63 //allocate a single buffer for all recv
64 void* smpi_get_tmp_recvbuffer(int size){
65   if (!smpi_process()->replaying())
66     return xbt_malloc(size);
67   if (recvbuffer_size<size){
68     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
69     recvbuffer_size=size;
70   }
71   return recvbuffer;
72 }
73
74 void smpi_free_tmp_buffer(void* buf){
75   if (!smpi_process()->replaying())
76     xbt_free(buf);
77 }
78
79 /* Helper function */
80 static double parse_double(const char *string)
81 {
82   char *endptr;
83   double value = strtod(string, &endptr);
84   if (*endptr != '\0')
85     THROWF(unknown_error, 0, "%s is not a double", string);
86   return value;
87 }
88
89 static MPI_Datatype decode_datatype(const char *const action)
90 {
91   switch(atoi(action)) {
92     case 0:
93       MPI_CURRENT_TYPE=MPI_DOUBLE;
94       break;
95     case 1:
96       MPI_CURRENT_TYPE=MPI_INT;
97       break;
98     case 2:
99       MPI_CURRENT_TYPE=MPI_CHAR;
100       break;
101     case 3:
102       MPI_CURRENT_TYPE=MPI_SHORT;
103       break;
104     case 4:
105       MPI_CURRENT_TYPE=MPI_LONG;
106       break;
107     case 5:
108       MPI_CURRENT_TYPE=MPI_FLOAT;
109       break;
110     case 6:
111       MPI_CURRENT_TYPE=MPI_BYTE;
112       break;
113     default:
114       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
115   }
116    return MPI_CURRENT_TYPE;
117 }
118
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
120 {
121   //default type for output is set to MPI_BYTE
122   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
123   if(known!=nullptr)
124     *known=1;
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
140   if(known!=nullptr)
141     *known=0;
142   // default - not implemented.
143   // do not warn here as we pass in this function even for other trace formats
144   return "-1";
145 }
146
147 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
148     int i=0;\
149     while(action[i]!=nullptr)\
150      i++;\
151     if(i<mandatory+2)                                           \
152     THROWF(arg_error, 0, "%s replay failed.\n" \
153           "%d items were given on the line. First two should be process_id and action.  " \
154           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
155           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
156   }
157
158 namespace simgrid {
159 namespace smpi {
160
161 static void action_init(const char *const *action)
162 {
163   XBT_DEBUG("Initialize the counters");
164   CHECK_ACTION_PARAMS(action, 0, 1)
165   if(action[2]) 
166     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
167   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
168
169   /* start a simulated timer */
170   smpi_process()->simulated_start();
171   /*initialize the number of active processes */
172   active_processes = smpi_process_count();
173
174   set_reqq_self(new std::vector<MPI_Request>);
175 }
176
177 static void action_finalize(const char *const *action)
178 {
179   /* Nothing to do */
180 }
181
182 static void action_comm_size(const char *const *action)
183 {
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_comm_dup(const char *const *action)
194 {
195   log_timed_action (action, smpi_process()->simulated_elapsed());
196 }
197
198 static void action_compute(const char *const *action)
199 {
200   CHECK_ACTION_PARAMS(action, 1, 0)
201   double clock = smpi_process()->simulated_elapsed();
202   double flops= parse_double(action[2]);
203   int rank = smpi_process()->index();
204   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
205   extra->type=TRACING_COMPUTING;
206   extra->comp_size=flops;
207   TRACE_smpi_computing_in(rank, extra);
208
209   smpi_execute_flops(flops);
210
211   TRACE_smpi_computing_out(rank);
212   log_timed_action (action, clock);
213 }
214
215 static void action_send(const char *const *action)
216 {
217   CHECK_ACTION_PARAMS(action, 2, 1)
218   int to = atoi(action[2]);
219   double size=parse_double(action[3]);
220   double clock = smpi_process()->simulated_elapsed();
221
222   if(action[4])
223     MPI_CURRENT_TYPE=decode_datatype(action[4]);
224   else
225     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
226
227   int rank = smpi_process()->index();
228
229   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
230   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
231   extra->type = TRACING_SEND;
232   extra->send_size = size;
233   extra->src = rank;
234   extra->dst = dst_traced;
235   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
236   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
237   if (!TRACE_smpi_view_internals())
238     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
239
240   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
241
242   log_timed_action (action, clock);
243
244   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
245 }
246
247 static void action_Isend(const char *const *action)
248 {
249   CHECK_ACTION_PARAMS(action, 2, 1)
250   int to = atoi(action[2]);
251   double size=parse_double(action[3]);
252   double clock = smpi_process()->simulated_elapsed();
253
254   if(action[4]) 
255     MPI_CURRENT_TYPE=decode_datatype(action[4]);
256   else 
257     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
258
259   int rank = smpi_process()->index();
260   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
261   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
262   extra->type = TRACING_ISEND;
263   extra->send_size = size;
264   extra->src = rank;
265   extra->dst = dst_traced;
266   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
267   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
268   if (!TRACE_smpi_view_internals())
269     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
270
271   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
272
273   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
274
275   get_reqq_self()->push_back(request);
276
277   log_timed_action (action, clock);
278 }
279
280 static void action_recv(const char *const *action) {
281   CHECK_ACTION_PARAMS(action, 2, 1)
282   int from = atoi(action[2]);
283   double size=parse_double(action[3]);
284   double clock = smpi_process()->simulated_elapsed();
285   MPI_Status status;
286
287   if(action[4]) 
288     MPI_CURRENT_TYPE=decode_datatype(action[4]);
289   else 
290     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
291
292   int rank = smpi_process()->index();
293   int src_traced = MPI_COMM_WORLD->group()->rank(from);
294
295   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
296   extra->type = TRACING_RECV;
297   extra->send_size = size;
298   extra->src = src_traced;
299   extra->dst = rank;
300   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
301   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
302
303   //unknown size from the receiver point of view
304   if(size<=0.0){
305     Request::probe(from, 0, MPI_COMM_WORLD, &status);
306     size=status.count;
307   }
308
309   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
310
311   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
312   if (!TRACE_smpi_view_internals()) {
313     TRACE_smpi_recv(rank, src_traced, rank, 0);
314   }
315
316   log_timed_action (action, clock);
317 }
318
319 static void action_Irecv(const char *const *action)
320 {
321   CHECK_ACTION_PARAMS(action, 2, 1)
322   int from = atoi(action[2]);
323   double size=parse_double(action[3]);
324   double clock = smpi_process()->simulated_elapsed();
325
326   if(action[4]) 
327     MPI_CURRENT_TYPE=decode_datatype(action[4]);
328   else 
329     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
330
331   int rank = smpi_process()->index();
332   int src_traced = MPI_COMM_WORLD->group()->rank(from);
333   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
334   extra->type = TRACING_IRECV;
335   extra->send_size = size;
336   extra->src = src_traced;
337   extra->dst = rank;
338   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
339   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
340   MPI_Status status;
341   //unknow size from the receiver pov
342   if(size<=0.0){
343       Request::probe(from, 0, MPI_COMM_WORLD, &status);
344       size=status.count;
345   }
346
347   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
348
349   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
350   get_reqq_self()->push_back(request);
351
352   log_timed_action (action, clock);
353 }
354
355 static void action_test(const char *const *action){
356   CHECK_ACTION_PARAMS(action, 0, 0)
357   double clock = smpi_process()->simulated_elapsed();
358   MPI_Status status;
359
360   MPI_Request request = get_reqq_self()->back();
361   get_reqq_self()->pop_back();
362   //if request is null here, this may mean that a previous test has succeeded 
363   //Different times in traced application and replayed version may lead to this 
364   //In this case, ignore the extra calls.
365   if(request!=nullptr){
366     int rank = smpi_process()->index();
367     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
368     extra->type=TRACING_TEST;
369     TRACE_smpi_testing_in(rank, extra);
370
371     int flag = Request::test(&request, &status);
372
373     XBT_DEBUG("MPI_Test result: %d", flag);
374     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
375     get_reqq_self()->push_back(request);
376
377     TRACE_smpi_testing_out(rank);
378   }
379   log_timed_action (action, clock);
380 }
381
382 static void action_wait(const char *const *action){
383   CHECK_ACTION_PARAMS(action, 0, 0)
384   double clock = smpi_process()->simulated_elapsed();
385   MPI_Status status;
386
387   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
388       xbt_str_join_array(action," "));
389   MPI_Request request = get_reqq_self()->back();
390   get_reqq_self()->pop_back();
391
392   if (request==nullptr){
393     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
394     return;
395   }
396
397   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
398
399   MPI_Group group = request->comm()->group();
400   int src_traced = group->rank(request->src());
401   int dst_traced = group->rank(request->dst());
402   int is_wait_for_receive = (request->flags() & RECV);
403   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
404   extra->type = TRACING_WAIT;
405   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406
407   Request::wait(&request, &status);
408
409   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
410   if (is_wait_for_receive)
411     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
412   log_timed_action (action, clock);
413 }
414
415 static void action_waitall(const char *const *action){
416   CHECK_ACTION_PARAMS(action, 0, 0)
417   double clock = smpi_process()->simulated_elapsed();
418   unsigned int count_requests=get_reqq_self()->size();
419
420   if (count_requests>0) {
421     MPI_Status status[count_requests];
422
423    int rank_traced = smpi_process()->index();
424    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
425    extra->type = TRACING_WAITALL;
426    extra->send_size=count_requests;
427    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
428    int recvs_snd[count_requests];
429    int recvs_rcv[count_requests];
430    unsigned int i=0;
431    for (auto req : *(get_reqq_self())){
432      if (req && (req->flags () & RECV)){
433        recvs_snd[i]=req->src();
434        recvs_rcv[i]=req->dst();
435      }else
436        recvs_snd[i]=-100;
437      i++;
438    }
439    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
440
441    for (i=0; i<count_requests;i++){
442      if (recvs_snd[i]!=-100)
443        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
444    }
445    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
446   }
447   log_timed_action (action, clock);
448 }
449
450 static void action_barrier(const char *const *action){
451   double clock = smpi_process()->simulated_elapsed();
452   int rank = smpi_process()->index();
453   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
454   extra->type = TRACING_BARRIER;
455   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
456
457   Colls::barrier(MPI_COMM_WORLD);
458
459   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
460   log_timed_action (action, clock);
461 }
462
463 static void action_bcast(const char *const *action)
464 {
465   CHECK_ACTION_PARAMS(action, 1, 2)
466   double size = parse_double(action[2]);
467   double clock = smpi_process()->simulated_elapsed();
468   int root=0;
469   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
470   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
471
472   if(action[3]) {
473     root= atoi(action[3]);
474     if(action[4])
475       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
476   }
477
478   int rank = smpi_process()->index();
479   int root_traced = MPI_COMM_WORLD->group()->index(root);
480
481   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
482   extra->type = TRACING_BCAST;
483   extra->send_size = size;
484   extra->root = root_traced;
485   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
486   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
487   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
488
489   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
490
491   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
492   log_timed_action (action, clock);
493 }
494
495 static void action_reduce(const char *const *action)
496 {
497   CHECK_ACTION_PARAMS(action, 2, 2)
498   double comm_size = parse_double(action[2]);
499   double comp_size = parse_double(action[3]);
500   double clock = smpi_process()->simulated_elapsed();
501   int root=0;
502   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
503
504   if(action[4]) {
505     root= atoi(action[4]);
506     if(action[5])
507       MPI_CURRENT_TYPE=decode_datatype(action[5]);
508   }
509
510   int rank = smpi_process()->index();
511   int root_traced = MPI_COMM_WORLD->group()->rank(root);
512   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
513   extra->type = TRACING_REDUCE;
514   extra->send_size = comm_size;
515   extra->comp_size = comp_size;
516   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
517   extra->root = root_traced;
518
519   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
520
521   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
522   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
524   smpi_execute_flops(comp_size);
525
526   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
527   log_timed_action (action, clock);
528 }
529
530 static void action_allReduce(const char *const *action) {
531   CHECK_ACTION_PARAMS(action, 2, 1)
532   double comm_size = parse_double(action[2]);
533   double comp_size = parse_double(action[3]);
534
535   if(action[4])
536     MPI_CURRENT_TYPE=decode_datatype(action[4]);
537   else
538     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
539
540   double clock = smpi_process()->simulated_elapsed();
541   int rank = smpi_process()->index();
542   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
543   extra->type = TRACING_ALLREDUCE;
544   extra->send_size = comm_size;
545   extra->comp_size = comp_size;
546   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
547   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
548
549   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
550   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
552   smpi_execute_flops(comp_size);
553
554   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
555   log_timed_action (action, clock);
556 }
557
558 static void action_allToAll(const char *const *action) {
559   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
560   double clock = smpi_process()->simulated_elapsed();
561   int comm_size = MPI_COMM_WORLD->size();
562   int send_size = parse_double(action[2]);
563   int recv_size = parse_double(action[3]);
564   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
565
566   if(action[4] && action[5]) {
567     MPI_CURRENT_TYPE=decode_datatype(action[4]);
568     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
569   }
570   else
571     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
572
573   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
574   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
575
576   int rank = smpi_process()->index();
577   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
578   extra->type = TRACING_ALLTOALL;
579   extra->send_size = send_size;
580   extra->recv_size = recv_size;
581   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
582   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
583
584   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
585
586   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
587
588   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
589   log_timed_action (action, clock);
590 }
591
592 static void action_gather(const char *const *action) {
593   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
594         0 gather 68 68 0 0 0
595       where:
596         1) 68 is the sendcounts
597         2) 68 is the recvcounts
598         3) 0 is the root node
599         4) 0 is the send datatype id, see decode_datatype()
600         5) 0 is the recv datatype id, see decode_datatype()
601   */
602   CHECK_ACTION_PARAMS(action, 2, 3)
603   double clock = smpi_process()->simulated_elapsed();
604   int comm_size = MPI_COMM_WORLD->size();
605   int send_size = parse_double(action[2]);
606   int recv_size = parse_double(action[3]);
607   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
608   if(action[4] && action[5]) {
609     MPI_CURRENT_TYPE=decode_datatype(action[5]);
610     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
611   } else {
612     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
613   }
614   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
615   void *recv = nullptr;
616   int root=0;
617   if(action[4])
618     root=atoi(action[4]);
619   int rank = MPI_COMM_WORLD->rank();
620
621   if(rank==root)
622     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
623
624   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
625   extra->type = TRACING_GATHER;
626   extra->send_size = send_size;
627   extra->recv_size = recv_size;
628   extra->root = root;
629   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
630   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
631
632   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
633
634   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
635
636   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
637   log_timed_action (action, clock);
638 }
639
640 static void action_gatherv(const char *const *action) {
641   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
642        0 gather 68 68 10 10 10 0 0 0
643      where:
644        1) 68 is the sendcount
645        2) 68 10 10 10 is the recvcounts
646        3) 0 is the root node
647        4) 0 is the send datatype id, see decode_datatype()
648        5) 0 is the recv datatype id, see decode_datatype()
649   */
650   double clock = smpi_process()->simulated_elapsed();
651   int comm_size = MPI_COMM_WORLD->size();
652   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
653   int send_size = parse_double(action[2]);
654   int disps[comm_size];
655   int recvcounts[comm_size];
656   int recv_sum=0;
657
658   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
659   if(action[4+comm_size] && action[5+comm_size]) {
660     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
661     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
662   } else
663     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
664
665   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
666   void *recv = nullptr;
667   for(int i=0;i<comm_size;i++) {
668     recvcounts[i] = atoi(action[i+3]);
669     recv_sum=recv_sum+recvcounts[i];
670     disps[i]=0;
671   }
672
673   int root=atoi(action[3+comm_size]);
674   int rank = MPI_COMM_WORLD->rank();
675
676   if(rank==root)
677     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
678
679   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
680   extra->type = TRACING_GATHERV;
681   extra->send_size = send_size;
682   extra->recvcounts= xbt_new(int,comm_size);
683   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
684     extra->recvcounts[i] = recvcounts[i];
685   extra->root = root;
686   extra->num_processes = comm_size;
687   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
688   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
689
690   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
691
692   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
693
694   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
695   log_timed_action (action, clock);
696 }
697
698 static void action_reducescatter(const char *const *action) {
699  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
700       0 reduceScatter 275427 275427 275427 204020 11346849 0
701     where:
702       1) The first four values after the name of the action declare the recvcounts array
703       2) The value 11346849 is the amount of instructions
704       3) The last value corresponds to the datatype, see decode_datatype().
705 */
706   double clock = smpi_process()->simulated_elapsed();
707   int comm_size = MPI_COMM_WORLD->size();
708   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
709   int comp_size = parse_double(action[2+comm_size]);
710   int recvcounts[comm_size];
711   int rank = smpi_process()->index();
712   int size = 0;
713   if(action[3+comm_size])
714     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
715   else
716     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
717
718   for(int i=0;i<comm_size;i++) {
719     recvcounts[i] = atoi(action[i+2]);
720     size+=recvcounts[i];
721   }
722
723   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
724   extra->type = TRACING_REDUCE_SCATTER;
725   extra->send_size = 0;
726   extra->recvcounts= xbt_new(int, comm_size);
727   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
728     extra->recvcounts[i] = recvcounts[i];
729   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
730   extra->comp_size = comp_size;
731   extra->num_processes = comm_size;
732
733   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
734
735   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
736   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
737
738   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
739   smpi_execute_flops(comp_size);
740
741   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
742   log_timed_action (action, clock);
743 }
744
745 static void action_allgather(const char *const *action) {
746   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
747         0 allGather 275427 275427
748     where:
749         1) 275427 is the sendcount
750         2) 275427 is the recvcount
751         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
752   */
753   double clock = smpi_process()->simulated_elapsed();
754
755   CHECK_ACTION_PARAMS(action, 2, 2)
756   int sendcount=atoi(action[2]); 
757   int recvcount=atoi(action[3]); 
758
759   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
760
761   if(action[4] && action[5]) {
762     MPI_CURRENT_TYPE = decode_datatype(action[4]);
763     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
764   } else
765     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
766
767   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
768   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
769
770   int rank = smpi_process()->index();
771   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
772   extra->type = TRACING_ALLGATHER;
773   extra->send_size = sendcount;
774   extra->recv_size= recvcount;
775   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
776   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
777   extra->num_processes = MPI_COMM_WORLD->size();
778
779   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
780
781   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
782
783   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
784   log_timed_action (action, clock);
785 }
786
787 static void action_allgatherv(const char *const *action) {
788   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
789         0 allGatherV 275427 275427 275427 275427 204020
790      where:
791         1) 275427 is the sendcount
792         2) The next four elements declare the recvcounts array
793         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
794   */
795   double clock = smpi_process()->simulated_elapsed();
796
797   int comm_size = MPI_COMM_WORLD->size();
798   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
799   int sendcount=atoi(action[2]);
800   int recvcounts[comm_size];
801   int disps[comm_size];
802   int recv_sum=0;
803   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
804
805   if(action[3+comm_size] && action[4+comm_size]) {
806     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
807     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
808   } else
809     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
810
811   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
812
813   for(int i=0;i<comm_size;i++) {
814     recvcounts[i] = atoi(action[i+3]);
815     recv_sum=recv_sum+recvcounts[i];
816     disps[i] = 0;
817   }
818   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
819
820   int rank = smpi_process()->index();
821   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
822   extra->type = TRACING_ALLGATHERV;
823   extra->send_size = sendcount;
824   extra->recvcounts= xbt_new(int, comm_size);
825   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
826     extra->recvcounts[i] = recvcounts[i];
827   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
828   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
829   extra->num_processes = comm_size;
830
831   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
832
833   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
834                           MPI_COMM_WORLD);
835
836   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
837   log_timed_action (action, clock);
838 }
839
840 static void action_allToAllv(const char *const *action) {
841   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
842         0 allToAllV 100 1 7 10 12 100 1 70 10 5
843      where:
844         1) 100 is the size of the send buffer *sizeof(int),
845         2) 1 7 10 12 is the sendcounts array
846         3) 100*sizeof(int) is the size of the receiver buffer
847         4)  1 70 10 5 is the recvcounts array
848   */
849   double clock = smpi_process()->simulated_elapsed();
850
851   int comm_size = MPI_COMM_WORLD->size();
852   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
853   int sendcounts[comm_size];
854   int recvcounts[comm_size];
855   int senddisps[comm_size];
856   int recvdisps[comm_size];
857
858   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
859
860   int send_buf_size=parse_double(action[2]);
861   int recv_buf_size=parse_double(action[3+comm_size]);
862   if(action[4+2*comm_size] && action[5+2*comm_size]) {
863     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
864     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
865   }
866   else
867     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
868
869   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
870   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
871
872   for(int i=0;i<comm_size;i++) {
873     sendcounts[i] = atoi(action[i+3]);
874     recvcounts[i] = atoi(action[i+4+comm_size]);
875     senddisps[i] = 0;
876     recvdisps[i] = 0;
877   }
878
879   int rank = smpi_process()->index();
880   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
881   extra->type = TRACING_ALLTOALLV;
882   extra->recvcounts= xbt_new(int, comm_size);
883   extra->sendcounts= xbt_new(int, comm_size);
884   extra->num_processes = comm_size;
885
886   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
887     extra->send_size += sendcounts[i];
888     extra->sendcounts[i] = sendcounts[i];
889     extra->recv_size += recvcounts[i];
890     extra->recvcounts[i] = recvcounts[i];
891   }
892   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
893   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
894
895   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
896
897   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
898                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
899
900   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
901   log_timed_action (action, clock);
902 }
903
904 }} // namespace simgrid::smpi
905
906 void smpi_replay_run(int *argc, char***argv){
907   /* First initializes everything */
908   simgrid::smpi::Process::init(argc, argv);
909   smpi_process()->mark_as_initialized();
910   smpi_process()->set_replaying(true);
911
912   int rank = smpi_process()->index();
913   TRACE_smpi_init(rank);
914   TRACE_smpi_computing_init(rank);
915   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
916   extra->type = TRACING_INIT;
917   char *operation =bprintf("%s_init",__FUNCTION__);
918   TRACE_smpi_collective_in(rank, -1, operation, extra);
919   TRACE_smpi_collective_out(rank, -1, operation);
920   xbt_free(operation);
921   xbt_replay_action_register("init",       simgrid::smpi::action_init);
922   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
923   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
924   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
925   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
926   xbt_replay_action_register("send",       simgrid::smpi::action_send);
927   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
928   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
929   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
930   xbt_replay_action_register("test",       simgrid::smpi::action_test);
931   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
932   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
933   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
934   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
935   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
936   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
937   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
938   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
939   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
940   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
941   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
942   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
943   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
944   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
945
946   //if we have a delayed start, sleep here.
947   if(*argc>2){
948     char *endptr;
949     double value = strtod((*argv)[2], &endptr);
950     if (*endptr != '\0')
951       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
952     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
953     smpi_execute_flops(value);
954   } else {
955     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
956     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
957     smpi_execute_flops(0.0);
958   }
959
960   /* Actually run the replay */
961   simgrid::xbt::replay_runner(*argc, *argv);
962
963   /* and now, finalize everything */
964   /* One active process will stop. Decrease the counter*/
965   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
966   if (!get_reqq_self()->empty()){
967     unsigned int count_requests=get_reqq_self()->size();
968     MPI_Request requests[count_requests];
969     MPI_Status status[count_requests];
970     unsigned int i=0;
971
972     for (auto req: *get_reqq_self()){
973       requests[i] = req;
974       i++;
975     }
976     simgrid::smpi::Request::waitall(count_requests, requests, status);
977   }
978   delete get_reqq_self();
979   active_processes--;
980
981   if(active_processes==0){
982     /* Last process alive speaking: end the simulated timer */
983     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
984     xbt_free(sendbuffer);
985     xbt_free(recvbuffer);
986   }
987
988   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
989   extra_fin->type = TRACING_FINALIZE;
990   operation =bprintf("%s_finalize",__FUNCTION__);
991   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
992
993   smpi_process()->finalize();
994
995   TRACE_smpi_collective_out(rank, -1, operation);
996   TRACE_smpi_finalize(smpi_process()->index());
997   xbt_free(operation);
998 }