Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
smpi_replay: allow to register extra events before things start (+plug a memleak)
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.h"
8 #include <unordered_map>
9 #include <vector>
10
11 #define KEY_SIZE (sizeof(int) * 2 + 1)
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static int sendbuffer_size=0;
23 char* sendbuffer=nullptr;
24 static int recvbuffer_size=0;
25 char* recvbuffer=nullptr;
26
27 static void log_timed_action (const char *const *action, double clock){
28   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
29     char *name = xbt_str_join_array(action, " ");
30     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
31     xbt_free(name);
32   }
33 }
34
35 static std::vector<MPI_Request>* get_reqq_self()
36 {
37   return reqq.at(smpi_process_index());
38 }
39
40 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
41 {
42    reqq.insert({smpi_process_index(), mpi_request});
43 }
44
45 //allocate a single buffer for all sends, growing it if needed
46 void* smpi_get_tmp_sendbuffer(int size)
47 {
48   if (!smpi_process_get_replaying())
49     return xbt_malloc(size);
50   if (sendbuffer_size<size){
51     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
52     sendbuffer_size=size;
53   }
54   return sendbuffer;
55 }
56
57 //allocate a single buffer for all recv
58 void* smpi_get_tmp_recvbuffer(int size){
59   if (!smpi_process_get_replaying())
60     return xbt_malloc(size);
61   if (recvbuffer_size<size){
62     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
63     recvbuffer_size=size;
64   }
65   return recvbuffer;
66 }
67
68 void smpi_free_tmp_buffer(void* buf){
69   if (!smpi_process_get_replaying())
70     xbt_free(buf);
71 }
72
73 /* Helper function */
74 static double parse_double(const char *string)
75 {
76   char *endptr;
77   double value = strtod(string, &endptr);
78   if (*endptr != '\0')
79     THROWF(unknown_error, 0, "%s is not a double", string);
80   return value;
81 }
82
83 static MPI_Datatype decode_datatype(const char *const action)
84 {
85   switch(atoi(action)) {
86     case 0:
87       MPI_CURRENT_TYPE=MPI_DOUBLE;
88       break;
89     case 1:
90       MPI_CURRENT_TYPE=MPI_INT;
91       break;
92     case 2:
93       MPI_CURRENT_TYPE=MPI_CHAR;
94       break;
95     case 3:
96       MPI_CURRENT_TYPE=MPI_SHORT;
97       break;
98     case 4:
99       MPI_CURRENT_TYPE=MPI_LONG;
100       break;
101     case 5:
102       MPI_CURRENT_TYPE=MPI_FLOAT;
103       break;
104     case 6:
105       MPI_CURRENT_TYPE=MPI_BYTE;
106       break;
107     default:
108       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
109   }
110    return MPI_CURRENT_TYPE;
111 }
112
113 const char* encode_datatype(MPI_Datatype datatype, int* known)
114 {
115   //default type for output is set to MPI_BYTE
116   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
117   if(known!=nullptr)
118     *known=1;
119   if (datatype==MPI_BYTE)
120       return "";
121   if(datatype==MPI_DOUBLE)
122       return "0";
123   if(datatype==MPI_INT)
124       return "1";
125   if(datatype==MPI_CHAR)
126       return "2";
127   if(datatype==MPI_SHORT)
128       return "3";
129   if(datatype==MPI_LONG)
130     return "4";
131   if(datatype==MPI_FLOAT)
132       return "5";
133   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
134   if(known!=nullptr)
135     *known=0;
136   // default - not implemented.
137   // do not warn here as we pass in this function even for other trace formats
138   return "-1";
139 }
140
141 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
142     int i=0;\
143     while(action[i]!=nullptr)\
144      i++;\
145     if(i<mandatory+2)                                           \
146     THROWF(arg_error, 0, "%s replay failed.\n" \
147           "%d items were given on the line. First two should be process_id and action.  " \
148           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
149           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
150   }
151
152 static void action_init(const char *const *action)
153 {
154   XBT_DEBUG("Initialize the counters");
155   CHECK_ACTION_PARAMS(action, 0, 1)
156   if(action[2]) 
157     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
158   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
159
160   /* start a simulated timer */
161   smpi_process_simulated_start();
162   /*initialize the number of active processes */
163   active_processes = smpi_process_count();
164
165   set_reqq_self(new std::vector<MPI_Request>);
166 }
167
168 static void action_finalize(const char *const *action)
169 {
170   /* Nothing to do */
171 }
172
173 static void action_comm_size(const char *const *action)
174 {
175   communicator_size = parse_double(action[2]);
176   log_timed_action (action, smpi_process_simulated_elapsed());
177 }
178
179 static void action_comm_split(const char *const *action)
180 {
181   log_timed_action (action, smpi_process_simulated_elapsed());
182 }
183
184 static void action_comm_dup(const char *const *action)
185 {
186   log_timed_action (action, smpi_process_simulated_elapsed());
187 }
188
189 static void action_compute(const char *const *action)
190 {
191   CHECK_ACTION_PARAMS(action, 1, 0)
192   double clock = smpi_process_simulated_elapsed();
193   double flops= parse_double(action[2]);
194   int rank = smpi_process_index();
195   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
196   extra->type=TRACING_COMPUTING;
197   extra->comp_size=flops;
198   TRACE_smpi_computing_in(rank, extra);
199
200   smpi_execute_flops(flops);
201
202   TRACE_smpi_computing_out(rank);
203   log_timed_action (action, clock);
204 }
205
206 static void action_send(const char *const *action)
207 {
208   CHECK_ACTION_PARAMS(action, 2, 1)
209   int to = atoi(action[2]);
210   double size=parse_double(action[3]);
211   double clock = smpi_process_simulated_elapsed();
212
213   if(action[4])
214     MPI_CURRENT_TYPE=decode_datatype(action[4]);
215   else
216     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
217
218   int rank = smpi_process_index();
219
220   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
221   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
222   extra->type = TRACING_SEND;
223   extra->send_size = size;
224   extra->src = rank;
225   extra->dst = dst_traced;
226   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
227   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
228   if (!TRACE_smpi_view_internals())
229     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
230
231   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
232
233   log_timed_action (action, clock);
234
235   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
236 }
237
238 static void action_Isend(const char *const *action)
239 {
240   CHECK_ACTION_PARAMS(action, 2, 1)
241   int to = atoi(action[2]);
242   double size=parse_double(action[3]);
243   double clock = smpi_process_simulated_elapsed();
244
245   if(action[4]) 
246     MPI_CURRENT_TYPE=decode_datatype(action[4]);
247   else 
248     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249
250   int rank = smpi_process_index();
251   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_ISEND;
254   extra->send_size = size;
255   extra->src = rank;
256   extra->dst = dst_traced;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
258   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
259   if (!TRACE_smpi_view_internals())
260     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
261
262   MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
263
264   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
265   request->send = 1;
266
267   get_reqq_self()->push_back(request);
268
269   log_timed_action (action, clock);
270 }
271
272 static void action_recv(const char *const *action) {
273   CHECK_ACTION_PARAMS(action, 2, 1)
274   int from = atoi(action[2]);
275   double size=parse_double(action[3]);
276   double clock = smpi_process_simulated_elapsed();
277   MPI_Status status;
278
279   if(action[4]) 
280     MPI_CURRENT_TYPE=decode_datatype(action[4]);
281   else 
282     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
283
284   int rank = smpi_process_index();
285   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
286
287   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
288   extra->type = TRACING_RECV;
289   extra->send_size = size;
290   extra->src = src_traced;
291   extra->dst = rank;
292   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
293   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
294
295   //unknown size from the receiver point of view
296   if(size<=0.0){
297     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
298     size=status.count;
299   }
300
301   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
302
303   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
304   if (!TRACE_smpi_view_internals()) {
305     TRACE_smpi_recv(rank, src_traced, rank, 0);
306   }
307
308   log_timed_action (action, clock);
309 }
310
311 static void action_Irecv(const char *const *action)
312 {
313   CHECK_ACTION_PARAMS(action, 2, 1)
314   int from = atoi(action[2]);
315   double size=parse_double(action[3]);
316   double clock = smpi_process_simulated_elapsed();
317
318   if(action[4]) 
319     MPI_CURRENT_TYPE=decode_datatype(action[4]);
320   else 
321     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
322
323   int rank = smpi_process_index();
324   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
325   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
326   extra->type = TRACING_IRECV;
327   extra->send_size = size;
328   extra->src = src_traced;
329   extra->dst = rank;
330   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
331   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
332   MPI_Status status;
333   //unknow size from the receiver pov
334   if(size<=0.0){
335       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
336       size=status.count;
337   }
338
339   MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
340
341   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
342   request->recv = 1;
343   get_reqq_self()->push_back(request);
344
345   log_timed_action (action, clock);
346 }
347
348 static void action_test(const char *const *action){
349   CHECK_ACTION_PARAMS(action, 0, 0)
350   double clock = smpi_process_simulated_elapsed();
351   MPI_Status status;
352
353   MPI_Request request = get_reqq_self()->back();
354   get_reqq_self()->pop_back();
355   //if request is null here, this may mean that a previous test has succeeded 
356   //Different times in traced application and replayed version may lead to this 
357   //In this case, ignore the extra calls.
358   if(request!=nullptr){
359     int rank = smpi_process_index();
360     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361     extra->type=TRACING_TEST;
362     TRACE_smpi_testing_in(rank, extra);
363
364     int flag = smpi_mpi_test(&request, &status);
365
366     XBT_DEBUG("MPI_Test result: %d", flag);
367     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
368     get_reqq_self()->push_back(request);
369
370     TRACE_smpi_testing_out(rank);
371   }
372   log_timed_action (action, clock);
373 }
374
375 static void action_wait(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0)
377   double clock = smpi_process_simulated_elapsed();
378   MPI_Status status;
379
380   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
381       xbt_str_join_array(action," "));
382   MPI_Request request = get_reqq_self()->back();
383   get_reqq_self()->pop_back();
384
385   if (request==nullptr){
386     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
387     return;
388   }
389
390   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
391
392   MPI_Group group = smpi_comm_group(request->comm);
393   int src_traced = smpi_group_rank(group, request->src);
394   int dst_traced = smpi_group_rank(group, request->dst);
395   int is_wait_for_receive = request->recv;
396   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
397   extra->type = TRACING_WAIT;
398   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
399
400   smpi_mpi_wait(&request, &status);
401
402   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
403   if (is_wait_for_receive)
404     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
405   log_timed_action (action, clock);
406 }
407
408 static void action_waitall(const char *const *action){
409   CHECK_ACTION_PARAMS(action, 0, 0)
410   double clock = smpi_process_simulated_elapsed();
411   unsigned int count_requests=get_reqq_self()->size();
412
413   if (count_requests>0) {
414     MPI_Status status[count_requests];
415
416    int rank_traced = smpi_process_index();
417    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418    extra->type = TRACING_WAITALL;
419    extra->send_size=count_requests;
420    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
421    int recvs_snd[count_requests];
422    int recvs_rcv[count_requests];
423    unsigned int i=0;
424    for (auto req : *(get_reqq_self())){
425      if (req && req->recv){
426        recvs_snd[i]=req->src;
427        recvs_rcv[i]=req->dst;
428      }else
429        recvs_snd[i]=-100;
430      i++;
431    }
432    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
433
434    for (i=0; i<count_requests;i++){
435      if (recvs_snd[i]!=-100)
436        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
437    }
438    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
439   }
440   log_timed_action (action, clock);
441 }
442
443 static void action_barrier(const char *const *action){
444   double clock = smpi_process_simulated_elapsed();
445   int rank = smpi_process_index();
446   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
447   extra->type = TRACING_BARRIER;
448   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
449
450   mpi_coll_barrier_fun(MPI_COMM_WORLD);
451
452   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
453   log_timed_action (action, clock);
454 }
455
456 static void action_bcast(const char *const *action)
457 {
458   CHECK_ACTION_PARAMS(action, 1, 2)
459   double size = parse_double(action[2]);
460   double clock = smpi_process_simulated_elapsed();
461   int root=0;
462   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
463   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
464
465   if(action[3]) {
466     root= atoi(action[3]);
467     if(action[4])
468       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
469   }
470
471   int rank = smpi_process_index();
472   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
473
474   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475   extra->type = TRACING_BCAST;
476   extra->send_size = size;
477   extra->root = root_traced;
478   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
479   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
481
482   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
483
484   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
485   log_timed_action (action, clock);
486 }
487
488 static void action_reduce(const char *const *action)
489 {
490   CHECK_ACTION_PARAMS(action, 2, 2)
491   double comm_size = parse_double(action[2]);
492   double comp_size = parse_double(action[3]);
493   double clock = smpi_process_simulated_elapsed();
494   int root=0;
495   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
496
497   if(action[4]) {
498     root= atoi(action[4]);
499     if(action[5])
500       MPI_CURRENT_TYPE=decode_datatype(action[5]);
501   }
502
503   int rank = smpi_process_index();
504   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
505   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
506   extra->type = TRACING_REDUCE;
507   extra->send_size = comm_size;
508   extra->comp_size = comp_size;
509   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
510   extra->root = root_traced;
511
512   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
513
514   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
515   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
516   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
517   smpi_execute_flops(comp_size);
518
519   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
520   log_timed_action (action, clock);
521 }
522
523 static void action_allReduce(const char *const *action) {
524   CHECK_ACTION_PARAMS(action, 2, 1)
525   double comm_size = parse_double(action[2]);
526   double comp_size = parse_double(action[3]);
527
528   if(action[4])
529     MPI_CURRENT_TYPE=decode_datatype(action[4]);
530   else
531     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
532
533   double clock = smpi_process_simulated_elapsed();
534   int rank = smpi_process_index();
535   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
536   extra->type = TRACING_ALLREDUCE;
537   extra->send_size = comm_size;
538   extra->comp_size = comp_size;
539   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
540   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
541
542   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
543   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
544   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
545   smpi_execute_flops(comp_size);
546
547   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
548   log_timed_action (action, clock);
549 }
550
551 static void action_allToAll(const char *const *action) {
552   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
553   double clock = smpi_process_simulated_elapsed();
554   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
555   int send_size = parse_double(action[2]);
556   int recv_size = parse_double(action[3]);
557   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
558
559   if(action[4] && action[5]) {
560     MPI_CURRENT_TYPE=decode_datatype(action[4]);
561     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
562   }
563   else
564     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
565
566   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
567   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
568
569   int rank = smpi_process_index();
570   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
571   extra->type = TRACING_ALLTOALL;
572   extra->send_size = send_size;
573   extra->recv_size = recv_size;
574   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
575   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
576
577   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
578
579   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
580
581   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
582   log_timed_action (action, clock);
583 }
584
585 static void action_gather(const char *const *action) {
586   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
587         0 gather 68 68 0 0 0
588       where:
589         1) 68 is the sendcounts
590         2) 68 is the recvcounts
591         3) 0 is the root node
592         4) 0 is the send datatype id, see decode_datatype()
593         5) 0 is the recv datatype id, see decode_datatype()
594   */
595   CHECK_ACTION_PARAMS(action, 2, 3)
596   double clock = smpi_process_simulated_elapsed();
597   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
598   int send_size = parse_double(action[2]);
599   int recv_size = parse_double(action[3]);
600   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601   if(action[4] && action[5]) {
602     MPI_CURRENT_TYPE=decode_datatype(action[5]);
603     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
604   } else {
605     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
606   }
607   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
608   void *recv = nullptr;
609   int root=0;
610   if(action[4])
611     root=atoi(action[4]);
612   int rank = smpi_comm_rank(MPI_COMM_WORLD);
613
614   if(rank==root)
615     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
616
617   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
618   extra->type = TRACING_GATHER;
619   extra->send_size = send_size;
620   extra->recv_size = recv_size;
621   extra->root = root;
622   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
623   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
624
625   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
626
627   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
628
629   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
630   log_timed_action (action, clock);
631 }
632
633 static void action_gatherv(const char *const *action) {
634   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
635        0 gather 68 68 10 10 10 0 0 0
636      where:
637        1) 68 is the sendcount
638        2) 68 10 10 10 is the recvcounts
639        3) 0 is the root node
640        4) 0 is the send datatype id, see decode_datatype()
641        5) 0 is the recv datatype id, see decode_datatype()
642   */
643   double clock = smpi_process_simulated_elapsed();
644   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
645   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
646   int send_size = parse_double(action[2]);
647   int disps[comm_size];
648   int recvcounts[comm_size];
649   int recv_sum=0;
650
651   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
652   if(action[4+comm_size] && action[5+comm_size]) {
653     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
654     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
655   } else
656     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
657
658   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
659   void *recv = nullptr;
660   for(int i=0;i<comm_size;i++) {
661     recvcounts[i] = atoi(action[i+3]);
662     recv_sum=recv_sum+recvcounts[i];
663     disps[i]=0;
664   }
665
666   int root=atoi(action[3+comm_size]);
667   int rank = smpi_comm_rank(MPI_COMM_WORLD);
668
669   if(rank==root)
670     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
671
672   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673   extra->type = TRACING_GATHERV;
674   extra->send_size = send_size;
675   extra->recvcounts= xbt_new(int,comm_size);
676   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
677     extra->recvcounts[i] = recvcounts[i];
678   extra->root = root;
679   extra->num_processes = comm_size;
680   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
681   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
682
683   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
684
685   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
686
687   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
688   log_timed_action (action, clock);
689 }
690
691 static void action_reducescatter(const char *const *action) {
692  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
693       0 reduceScatter 275427 275427 275427 204020 11346849 0
694     where:
695       1) The first four values after the name of the action declare the recvcounts array
696       2) The value 11346849 is the amount of instructions
697       3) The last value corresponds to the datatype, see decode_datatype().
698 */
699   double clock = smpi_process_simulated_elapsed();
700   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
701   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
702   int comp_size = parse_double(action[2+comm_size]);
703   int recvcounts[comm_size];
704   int rank = smpi_process_index();
705   int size = 0;
706   if(action[3+comm_size])
707     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
708   else
709     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
710
711   for(int i=0;i<comm_size;i++) {
712     recvcounts[i] = atoi(action[i+2]);
713     size+=recvcounts[i];
714   }
715
716   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
717   extra->type = TRACING_REDUCE_SCATTER;
718   extra->send_size = 0;
719   extra->recvcounts= xbt_new(int, comm_size);
720   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
721     extra->recvcounts[i] = recvcounts[i];
722   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
723   extra->comp_size = comp_size;
724   extra->num_processes = comm_size;
725
726   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
727
728   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
729   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
730
731   mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
732   smpi_execute_flops(comp_size);
733
734   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
735   log_timed_action (action, clock);
736 }
737
738 static void action_allgather(const char *const *action) {
739   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
740         0 allGather 275427 275427
741     where:
742         1) 275427 is the sendcount
743         2) 275427 is the recvcount
744         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
745   */
746   double clock = smpi_process_simulated_elapsed();
747
748   CHECK_ACTION_PARAMS(action, 2, 2)
749   int sendcount=atoi(action[2]); 
750   int recvcount=atoi(action[3]); 
751
752   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
753
754   if(action[4] && action[5]) {
755     MPI_CURRENT_TYPE = decode_datatype(action[4]);
756     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
757   } else
758     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
759
760   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
761   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
762
763   int rank = smpi_process_index();
764   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765   extra->type = TRACING_ALLGATHER;
766   extra->send_size = sendcount;
767   extra->recv_size= recvcount;
768   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
769   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
770   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
771
772   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
773
774   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
775
776   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
777   log_timed_action (action, clock);
778 }
779
780 static void action_allgatherv(const char *const *action) {
781   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
782         0 allGatherV 275427 275427 275427 275427 204020
783      where:
784         1) 275427 is the sendcount
785         2) The next four elements declare the recvcounts array
786         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
787   */
788   double clock = smpi_process_simulated_elapsed();
789
790   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
791   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
792   int sendcount=atoi(action[2]);
793   int recvcounts[comm_size];
794   int disps[comm_size];
795   int recv_sum=0;
796   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
797
798   if(action[3+comm_size] && action[4+comm_size]) {
799     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
801   } else
802     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
803
804   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
805
806   for(int i=0;i<comm_size;i++) {
807     recvcounts[i] = atoi(action[i+3]);
808     recv_sum=recv_sum+recvcounts[i];
809     disps[i] = 0;
810   }
811   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
812
813   int rank = smpi_process_index();
814   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
815   extra->type = TRACING_ALLGATHERV;
816   extra->send_size = sendcount;
817   extra->recvcounts= xbt_new(int, comm_size);
818   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
819     extra->recvcounts[i] = recvcounts[i];
820   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
821   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
822   extra->num_processes = comm_size;
823
824   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
825
826   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
827                           MPI_COMM_WORLD);
828
829   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
830   log_timed_action (action, clock);
831 }
832
833 static void action_allToAllv(const char *const *action) {
834   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
835         0 allToAllV 100 1 7 10 12 100 1 70 10 5
836      where:
837         1) 100 is the size of the send buffer *sizeof(int),
838         2) 1 7 10 12 is the sendcounts array
839         3) 100*sizeof(int) is the size of the receiver buffer
840         4)  1 70 10 5 is the recvcounts array
841   */
842   double clock = smpi_process_simulated_elapsed();
843
844   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
845   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
846   int sendcounts[comm_size];
847   int recvcounts[comm_size];
848   int senddisps[comm_size];
849   int recvdisps[comm_size];
850
851   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
852
853   int send_buf_size=parse_double(action[2]);
854   int recv_buf_size=parse_double(action[3+comm_size]);
855   if(action[4+2*comm_size] && action[5+2*comm_size]) {
856     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
857     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
858   }
859   else
860     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
861
862   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
863   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
864
865   for(int i=0;i<comm_size;i++) {
866     sendcounts[i] = atoi(action[i+3]);
867     recvcounts[i] = atoi(action[i+4+comm_size]);
868     senddisps[i] = 0;
869     recvdisps[i] = 0;
870   }
871
872   int rank = smpi_process_index();
873   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
874   extra->type = TRACING_ALLTOALLV;
875   extra->recvcounts= xbt_new(int, comm_size);
876   extra->sendcounts= xbt_new(int, comm_size);
877   extra->num_processes = comm_size;
878
879   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
880     extra->send_size += sendcounts[i];
881     extra->sendcounts[i] = sendcounts[i];
882     extra->recv_size += recvcounts[i];
883     extra->recvcounts[i] = recvcounts[i];
884   }
885   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
886   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
887
888   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
889
890   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
891                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
892
893   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894   log_timed_action (action, clock);
895 }
896
897 void smpi_replay_run(int *argc, char***argv){
898   /* First initializes everything */
899   smpi_process_init(argc, argv);
900   smpi_process_mark_as_initialized();
901   smpi_process_set_replaying(true);
902
903   int rank = smpi_process_index();
904   TRACE_smpi_init(rank);
905   TRACE_smpi_computing_init(rank);
906   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
907   extra->type = TRACING_INIT;
908   char *operation =bprintf("%s_init",__FUNCTION__);
909   TRACE_smpi_collective_in(rank, -1, operation, extra);
910   TRACE_smpi_collective_out(rank, -1, operation);
911   xbt_free(operation);
912
913   if (_xbt_replay_action_init()==0) {
914     xbt_replay_action_register("init",       action_init);
915     xbt_replay_action_register("finalize",   action_finalize);
916     xbt_replay_action_register("comm_size",  action_comm_size);
917     xbt_replay_action_register("comm_split", action_comm_split);
918     xbt_replay_action_register("comm_dup",   action_comm_dup);
919     xbt_replay_action_register("send",       action_send);
920     xbt_replay_action_register("Isend",      action_Isend);
921     xbt_replay_action_register("recv",       action_recv);
922     xbt_replay_action_register("Irecv",      action_Irecv);
923     xbt_replay_action_register("test",       action_test);
924     xbt_replay_action_register("wait",       action_wait);
925     xbt_replay_action_register("waitAll",    action_waitall);
926     xbt_replay_action_register("barrier",    action_barrier);
927     xbt_replay_action_register("bcast",      action_bcast);
928     xbt_replay_action_register("reduce",     action_reduce);
929     xbt_replay_action_register("allReduce",  action_allReduce);
930     xbt_replay_action_register("allToAll",   action_allToAll);
931     xbt_replay_action_register("allToAllV",  action_allToAllv);
932     xbt_replay_action_register("gather",  action_gather);
933     xbt_replay_action_register("gatherV",  action_gatherv);
934     xbt_replay_action_register("allGather",  action_allgather);
935     xbt_replay_action_register("allGatherV",  action_allgatherv);
936     xbt_replay_action_register("reduceScatter",  action_reducescatter);
937     xbt_replay_action_register("compute",    action_compute);
938   }
939
940   //if we have a delayed start, sleep here.
941   if(*argc>2){
942     char *endptr;
943     double value = strtod((*argv)[2], &endptr);
944     if (*endptr != '\0')
945       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
946     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
947     smpi_execute_flops(value);
948   } else {
949     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
950     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
951     smpi_execute_flops(0.0);
952   }
953
954   /* Actually run the replay */
955   xbt_replay_action_runner(*argc, *argv);
956
957   /* and now, finalize everything */
958   /* One active process will stop. Decrease the counter*/
959   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960   if (!get_reqq_self()->empty()){
961     unsigned int count_requests=get_reqq_self()->size();
962     MPI_Request requests[count_requests];
963     MPI_Status status[count_requests];
964     unsigned int i=0;
965
966     for (auto req: *get_reqq_self()){
967       requests[i] = req;
968       i++;
969     }
970     smpi_mpi_waitall(count_requests, requests, status);
971   }
972   delete get_reqq_self();
973   active_processes--;
974
975   if(active_processes==0){
976     /* Last process alive speaking: end the simulated timer */
977     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
978     _xbt_replay_action_exit();
979     xbt_free(sendbuffer);
980     xbt_free(recvbuffer);
981   }
982
983   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
984   extra_fin->type = TRACING_FINALIZE;
985   operation =bprintf("%s_finalize",__FUNCTION__);
986   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
987
988   smpi_process_finalize();
989
990   TRACE_smpi_collective_out(rank, -1, operation);
991   TRACE_smpi_finalize(smpi_process_index());
992   smpi_process_destroy();
993   xbt_free(operation);
994 }