Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
93404ccf3c0aaee7b442c175a8bd736582642c45
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.h"
7 #include "xbt/replay.h"
8 #include "src/smpi/smpi_group.hpp"
9 #include <unordered_map>
10 #include <vector>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=nullptr;
25 static int recvbuffer_size=0;
26 char* recvbuffer=nullptr;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     xbt_free(name);
33   }
34 }
35
36 static std::vector<MPI_Request>* get_reqq_self()
37 {
38   return reqq.at(smpi_process_index());
39 }
40
41 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
42 {
43    reqq.insert({smpi_process_index(), mpi_request});
44 }
45
46 //allocate a single buffer for all sends, growing it if needed
47 void* smpi_get_tmp_sendbuffer(int size)
48 {
49   if (!smpi_process_get_replaying())
50     return xbt_malloc(size);
51   if (sendbuffer_size<size){
52     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
53     sendbuffer_size=size;
54   }
55   return sendbuffer;
56 }
57
58 //allocate a single buffer for all recv
59 void* smpi_get_tmp_recvbuffer(int size){
60   if (!smpi_process_get_replaying())
61     return xbt_malloc(size);
62   if (recvbuffer_size<size){
63     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
64     recvbuffer_size=size;
65   }
66   return recvbuffer;
67 }
68
69 void smpi_free_tmp_buffer(void* buf){
70   if (!smpi_process_get_replaying())
71     xbt_free(buf);
72 }
73
74 /* Helper function */
75 static double parse_double(const char *string)
76 {
77   char *endptr;
78   double value = strtod(string, &endptr);
79   if (*endptr != '\0')
80     THROWF(unknown_error, 0, "%s is not a double", string);
81   return value;
82 }
83
84 static MPI_Datatype decode_datatype(const char *const action)
85 {
86   switch(atoi(action)) {
87     case 0:
88       MPI_CURRENT_TYPE=MPI_DOUBLE;
89       break;
90     case 1:
91       MPI_CURRENT_TYPE=MPI_INT;
92       break;
93     case 2:
94       MPI_CURRENT_TYPE=MPI_CHAR;
95       break;
96     case 3:
97       MPI_CURRENT_TYPE=MPI_SHORT;
98       break;
99     case 4:
100       MPI_CURRENT_TYPE=MPI_LONG;
101       break;
102     case 5:
103       MPI_CURRENT_TYPE=MPI_FLOAT;
104       break;
105     case 6:
106       MPI_CURRENT_TYPE=MPI_BYTE;
107       break;
108     default:
109       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
110   }
111    return MPI_CURRENT_TYPE;
112 }
113
114 const char* encode_datatype(MPI_Datatype datatype, int* known)
115 {
116   //default type for output is set to MPI_BYTE
117   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
118   if(known!=nullptr)
119     *known=1;
120   if (datatype==MPI_BYTE)
121       return "";
122   if(datatype==MPI_DOUBLE)
123       return "0";
124   if(datatype==MPI_INT)
125       return "1";
126   if(datatype==MPI_CHAR)
127       return "2";
128   if(datatype==MPI_SHORT)
129       return "3";
130   if(datatype==MPI_LONG)
131     return "4";
132   if(datatype==MPI_FLOAT)
133       return "5";
134   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
135   if(known!=nullptr)
136     *known=0;
137   // default - not implemented.
138   // do not warn here as we pass in this function even for other trace formats
139   return "-1";
140 }
141
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
143     int i=0;\
144     while(action[i]!=nullptr)\
145      i++;\
146     if(i<mandatory+2)                                           \
147     THROWF(arg_error, 0, "%s replay failed.\n" \
148           "%d items were given on the line. First two should be process_id and action.  " \
149           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
151   }
152
153 static void action_init(const char *const *action)
154 {
155   XBT_DEBUG("Initialize the counters");
156   CHECK_ACTION_PARAMS(action, 0, 1)
157   if(action[2]) 
158     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
159   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
160
161   /* start a simulated timer */
162   smpi_process_simulated_start();
163   /*initialize the number of active processes */
164   active_processes = smpi_process_count();
165
166   set_reqq_self(new std::vector<MPI_Request>);
167 }
168
169 static void action_finalize(const char *const *action)
170 {
171   /* Nothing to do */
172 }
173
174 static void action_comm_size(const char *const *action)
175 {
176   communicator_size = parse_double(action[2]);
177   log_timed_action (action, smpi_process_simulated_elapsed());
178 }
179
180 static void action_comm_split(const char *const *action)
181 {
182   log_timed_action (action, smpi_process_simulated_elapsed());
183 }
184
185 static void action_comm_dup(const char *const *action)
186 {
187   log_timed_action (action, smpi_process_simulated_elapsed());
188 }
189
190 static void action_compute(const char *const *action)
191 {
192   CHECK_ACTION_PARAMS(action, 1, 0)
193   double clock = smpi_process_simulated_elapsed();
194   double flops= parse_double(action[2]);
195   int rank = smpi_process_index();
196   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
197   extra->type=TRACING_COMPUTING;
198   extra->comp_size=flops;
199   TRACE_smpi_computing_in(rank, extra);
200
201   smpi_execute_flops(flops);
202
203   TRACE_smpi_computing_out(rank);
204   log_timed_action (action, clock);
205 }
206
207 static void action_send(const char *const *action)
208 {
209   CHECK_ACTION_PARAMS(action, 2, 1)
210   int to = atoi(action[2]);
211   double size=parse_double(action[3]);
212   double clock = smpi_process_simulated_elapsed();
213
214   if(action[4])
215     MPI_CURRENT_TYPE=decode_datatype(action[4]);
216   else
217     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
218
219   int rank = smpi_process_index();
220
221   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
222   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
223   extra->type = TRACING_SEND;
224   extra->send_size = size;
225   extra->src = rank;
226   extra->dst = dst_traced;
227   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
228   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
229   if (!TRACE_smpi_view_internals())
230     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
231
232   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233
234   log_timed_action (action, clock);
235
236   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
237 }
238
239 static void action_Isend(const char *const *action)
240 {
241   CHECK_ACTION_PARAMS(action, 2, 1)
242   int to = atoi(action[2]);
243   double size=parse_double(action[3]);
244   double clock = smpi_process_simulated_elapsed();
245
246   if(action[4]) 
247     MPI_CURRENT_TYPE=decode_datatype(action[4]);
248   else 
249     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250
251   int rank = smpi_process_index();
252   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
253   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
254   extra->type = TRACING_ISEND;
255   extra->send_size = size;
256   extra->src = rank;
257   extra->dst = dst_traced;
258   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
259   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
260   if (!TRACE_smpi_view_internals())
261     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
262
263   MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
264
265   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
266   request->send = 1;
267
268   get_reqq_self()->push_back(request);
269
270   log_timed_action (action, clock);
271 }
272
273 static void action_recv(const char *const *action) {
274   CHECK_ACTION_PARAMS(action, 2, 1)
275   int from = atoi(action[2]);
276   double size=parse_double(action[3]);
277   double clock = smpi_process_simulated_elapsed();
278   MPI_Status status;
279
280   if(action[4]) 
281     MPI_CURRENT_TYPE=decode_datatype(action[4]);
282   else 
283     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
284
285   int rank = smpi_process_index();
286   int src_traced = MPI_COMM_WORLD->group()->rank(from);
287
288   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289   extra->type = TRACING_RECV;
290   extra->send_size = size;
291   extra->src = src_traced;
292   extra->dst = rank;
293   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
294   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
295
296   //unknown size from the receiver point of view
297   if(size<=0.0){
298     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
299     size=status.count;
300   }
301
302   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
303
304   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
305   if (!TRACE_smpi_view_internals()) {
306     TRACE_smpi_recv(rank, src_traced, rank, 0);
307   }
308
309   log_timed_action (action, clock);
310 }
311
312 static void action_Irecv(const char *const *action)
313 {
314   CHECK_ACTION_PARAMS(action, 2, 1)
315   int from = atoi(action[2]);
316   double size=parse_double(action[3]);
317   double clock = smpi_process_simulated_elapsed();
318
319   if(action[4]) 
320     MPI_CURRENT_TYPE=decode_datatype(action[4]);
321   else 
322     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
323
324   int rank = smpi_process_index();
325   int src_traced = MPI_COMM_WORLD->group()->rank(from);
326   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
327   extra->type = TRACING_IRECV;
328   extra->send_size = size;
329   extra->src = src_traced;
330   extra->dst = rank;
331   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
332   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
333   MPI_Status status;
334   //unknow size from the receiver pov
335   if(size<=0.0){
336       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
337       size=status.count;
338   }
339
340   MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
341
342   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
343   request->recv = 1;
344   get_reqq_self()->push_back(request);
345
346   log_timed_action (action, clock);
347 }
348
349 static void action_test(const char *const *action){
350   CHECK_ACTION_PARAMS(action, 0, 0)
351   double clock = smpi_process_simulated_elapsed();
352   MPI_Status status;
353
354   MPI_Request request = get_reqq_self()->back();
355   get_reqq_self()->pop_back();
356   //if request is null here, this may mean that a previous test has succeeded 
357   //Different times in traced application and replayed version may lead to this 
358   //In this case, ignore the extra calls.
359   if(request!=nullptr){
360     int rank = smpi_process_index();
361     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362     extra->type=TRACING_TEST;
363     TRACE_smpi_testing_in(rank, extra);
364
365     int flag = smpi_mpi_test(&request, &status);
366
367     XBT_DEBUG("MPI_Test result: %d", flag);
368     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
369     get_reqq_self()->push_back(request);
370
371     TRACE_smpi_testing_out(rank);
372   }
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   CHECK_ACTION_PARAMS(action, 0, 0)
378   double clock = smpi_process_simulated_elapsed();
379   MPI_Status status;
380
381   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
382       xbt_str_join_array(action," "));
383   MPI_Request request = get_reqq_self()->back();
384   get_reqq_self()->pop_back();
385
386   if (request==nullptr){
387     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
388     return;
389   }
390
391   int rank = request->comm != MPI_COMM_NULL ? request->comm->rank() : -1;
392
393   MPI_Group group = request->comm->group();
394   int src_traced = group->rank(request->src);
395   int dst_traced = group->rank(request->dst);
396   int is_wait_for_receive = request->recv;
397   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398   extra->type = TRACING_WAIT;
399   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
400
401   smpi_mpi_wait(&request, &status);
402
403   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
404   if (is_wait_for_receive)
405     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
406   log_timed_action (action, clock);
407 }
408
409 static void action_waitall(const char *const *action){
410   CHECK_ACTION_PARAMS(action, 0, 0)
411   double clock = smpi_process_simulated_elapsed();
412   unsigned int count_requests=get_reqq_self()->size();
413
414   if (count_requests>0) {
415     MPI_Status status[count_requests];
416
417    int rank_traced = smpi_process_index();
418    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
419    extra->type = TRACING_WAITALL;
420    extra->send_size=count_requests;
421    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
422    int recvs_snd[count_requests];
423    int recvs_rcv[count_requests];
424    unsigned int i=0;
425    for (auto req : *(get_reqq_self())){
426      if (req && req->recv){
427        recvs_snd[i]=req->src;
428        recvs_rcv[i]=req->dst;
429      }else
430        recvs_snd[i]=-100;
431      i++;
432    }
433    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
434
435    for (i=0; i<count_requests;i++){
436      if (recvs_snd[i]!=-100)
437        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
438    }
439    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
440   }
441   log_timed_action (action, clock);
442 }
443
444 static void action_barrier(const char *const *action){
445   double clock = smpi_process_simulated_elapsed();
446   int rank = smpi_process_index();
447   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
448   extra->type = TRACING_BARRIER;
449   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
450
451   mpi_coll_barrier_fun(MPI_COMM_WORLD);
452
453   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
454   log_timed_action (action, clock);
455 }
456
457 static void action_bcast(const char *const *action)
458 {
459   CHECK_ACTION_PARAMS(action, 1, 2)
460   double size = parse_double(action[2]);
461   double clock = smpi_process_simulated_elapsed();
462   int root=0;
463   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
464   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
465
466   if(action[3]) {
467     root= atoi(action[3]);
468     if(action[4])
469       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
470   }
471
472   int rank = smpi_process_index();
473   int root_traced = MPI_COMM_WORLD->group()->index(root);
474
475   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
476   extra->type = TRACING_BCAST;
477   extra->send_size = size;
478   extra->root = root_traced;
479   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
480   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
481   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
482
483   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484
485   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486   log_timed_action (action, clock);
487 }
488
489 static void action_reduce(const char *const *action)
490 {
491   CHECK_ACTION_PARAMS(action, 2, 2)
492   double comm_size = parse_double(action[2]);
493   double comp_size = parse_double(action[3]);
494   double clock = smpi_process_simulated_elapsed();
495   int root=0;
496   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
497
498   if(action[4]) {
499     root= atoi(action[4]);
500     if(action[5])
501       MPI_CURRENT_TYPE=decode_datatype(action[5]);
502   }
503
504   int rank = smpi_process_index();
505   int root_traced = MPI_COMM_WORLD->group()->rank(root);
506   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
507   extra->type = TRACING_REDUCE;
508   extra->send_size = comm_size;
509   extra->comp_size = comp_size;
510   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
511   extra->root = root_traced;
512
513   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
514
515   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
516   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
517   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
518   smpi_execute_flops(comp_size);
519
520   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
521   log_timed_action (action, clock);
522 }
523
524 static void action_allReduce(const char *const *action) {
525   CHECK_ACTION_PARAMS(action, 2, 1)
526   double comm_size = parse_double(action[2]);
527   double comp_size = parse_double(action[3]);
528
529   if(action[4])
530     MPI_CURRENT_TYPE=decode_datatype(action[4]);
531   else
532     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533
534   double clock = smpi_process_simulated_elapsed();
535   int rank = smpi_process_index();
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_ALLREDUCE;
538   extra->send_size = comm_size;
539   extra->comp_size = comp_size;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
541   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
542
543   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
544   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
545   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
546   smpi_execute_flops(comp_size);
547
548   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
549   log_timed_action (action, clock);
550 }
551
552 static void action_allToAll(const char *const *action) {
553   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
554   double clock = smpi_process_simulated_elapsed();
555   int comm_size = MPI_COMM_WORLD->size();
556   int send_size = parse_double(action[2]);
557   int recv_size = parse_double(action[3]);
558   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
559
560   if(action[4] && action[5]) {
561     MPI_CURRENT_TYPE=decode_datatype(action[4]);
562     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
563   }
564   else
565     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
566
567   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
568   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
569
570   int rank = smpi_process_index();
571   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
572   extra->type = TRACING_ALLTOALL;
573   extra->send_size = send_size;
574   extra->recv_size = recv_size;
575   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
576   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
577
578   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
579
580   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
581
582   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
583   log_timed_action (action, clock);
584 }
585
586 static void action_gather(const char *const *action) {
587   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
588         0 gather 68 68 0 0 0
589       where:
590         1) 68 is the sendcounts
591         2) 68 is the recvcounts
592         3) 0 is the root node
593         4) 0 is the send datatype id, see decode_datatype()
594         5) 0 is the recv datatype id, see decode_datatype()
595   */
596   CHECK_ACTION_PARAMS(action, 2, 3)
597   double clock = smpi_process_simulated_elapsed();
598   int comm_size = MPI_COMM_WORLD->size();
599   int send_size = parse_double(action[2]);
600   int recv_size = parse_double(action[3]);
601   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
602   if(action[4] && action[5]) {
603     MPI_CURRENT_TYPE=decode_datatype(action[5]);
604     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
605   } else {
606     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
607   }
608   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
609   void *recv = nullptr;
610   int root=0;
611   if(action[4])
612     root=atoi(action[4]);
613   int rank = MPI_COMM_WORLD->rank();
614
615   if(rank==root)
616     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
617
618   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
619   extra->type = TRACING_GATHER;
620   extra->send_size = send_size;
621   extra->recv_size = recv_size;
622   extra->root = root;
623   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
624   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
625
626   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
627
628   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
629
630   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
631   log_timed_action (action, clock);
632 }
633
634 static void action_gatherv(const char *const *action) {
635   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
636        0 gather 68 68 10 10 10 0 0 0
637      where:
638        1) 68 is the sendcount
639        2) 68 10 10 10 is the recvcounts
640        3) 0 is the root node
641        4) 0 is the send datatype id, see decode_datatype()
642        5) 0 is the recv datatype id, see decode_datatype()
643   */
644   double clock = smpi_process_simulated_elapsed();
645   int comm_size = MPI_COMM_WORLD->size();
646   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
647   int send_size = parse_double(action[2]);
648   int disps[comm_size];
649   int recvcounts[comm_size];
650   int recv_sum=0;
651
652   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
653   if(action[4+comm_size] && action[5+comm_size]) {
654     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
655     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
656   } else
657     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
658
659   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
660   void *recv = nullptr;
661   for(int i=0;i<comm_size;i++) {
662     recvcounts[i] = atoi(action[i+3]);
663     recv_sum=recv_sum+recvcounts[i];
664     disps[i]=0;
665   }
666
667   int root=atoi(action[3+comm_size]);
668   int rank = MPI_COMM_WORLD->rank();
669
670   if(rank==root)
671     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
672
673   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
674   extra->type = TRACING_GATHERV;
675   extra->send_size = send_size;
676   extra->recvcounts= xbt_new(int,comm_size);
677   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
678     extra->recvcounts[i] = recvcounts[i];
679   extra->root = root;
680   extra->num_processes = comm_size;
681   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
682   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
683
684   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
685
686   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
687
688   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
689   log_timed_action (action, clock);
690 }
691
692 static void action_reducescatter(const char *const *action) {
693  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
694       0 reduceScatter 275427 275427 275427 204020 11346849 0
695     where:
696       1) The first four values after the name of the action declare the recvcounts array
697       2) The value 11346849 is the amount of instructions
698       3) The last value corresponds to the datatype, see decode_datatype().
699 */
700   double clock = smpi_process_simulated_elapsed();
701   int comm_size = MPI_COMM_WORLD->size();
702   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
703   int comp_size = parse_double(action[2+comm_size]);
704   int recvcounts[comm_size];
705   int rank = smpi_process_index();
706   int size = 0;
707   if(action[3+comm_size])
708     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
709   else
710     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
711
712   for(int i=0;i<comm_size;i++) {
713     recvcounts[i] = atoi(action[i+2]);
714     size+=recvcounts[i];
715   }
716
717   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
718   extra->type = TRACING_REDUCE_SCATTER;
719   extra->send_size = 0;
720   extra->recvcounts= xbt_new(int, comm_size);
721   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
722     extra->recvcounts[i] = recvcounts[i];
723   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
724   extra->comp_size = comp_size;
725   extra->num_processes = comm_size;
726
727   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
728
729   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
730   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
731
732   mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
733   smpi_execute_flops(comp_size);
734
735   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
736   log_timed_action (action, clock);
737 }
738
739 static void action_allgather(const char *const *action) {
740   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
741         0 allGather 275427 275427
742     where:
743         1) 275427 is the sendcount
744         2) 275427 is the recvcount
745         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
746   */
747   double clock = smpi_process_simulated_elapsed();
748
749   CHECK_ACTION_PARAMS(action, 2, 2)
750   int sendcount=atoi(action[2]); 
751   int recvcount=atoi(action[3]); 
752
753   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
754
755   if(action[4] && action[5]) {
756     MPI_CURRENT_TYPE = decode_datatype(action[4]);
757     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
758   } else
759     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
760
761   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
762   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
763
764   int rank = smpi_process_index();
765   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
766   extra->type = TRACING_ALLGATHER;
767   extra->send_size = sendcount;
768   extra->recv_size= recvcount;
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
770   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
771   extra->num_processes = MPI_COMM_WORLD->size();
772
773   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
774
775   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
776
777   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
778   log_timed_action (action, clock);
779 }
780
781 static void action_allgatherv(const char *const *action) {
782   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
783         0 allGatherV 275427 275427 275427 275427 204020
784      where:
785         1) 275427 is the sendcount
786         2) The next four elements declare the recvcounts array
787         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
788   */
789   double clock = smpi_process_simulated_elapsed();
790
791   int comm_size = MPI_COMM_WORLD->size();
792   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
793   int sendcount=atoi(action[2]);
794   int recvcounts[comm_size];
795   int disps[comm_size];
796   int recv_sum=0;
797   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
798
799   if(action[3+comm_size] && action[4+comm_size]) {
800     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
801     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
802   } else
803     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
804
805   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
806
807   for(int i=0;i<comm_size;i++) {
808     recvcounts[i] = atoi(action[i+3]);
809     recv_sum=recv_sum+recvcounts[i];
810     disps[i] = 0;
811   }
812   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
813
814   int rank = smpi_process_index();
815   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816   extra->type = TRACING_ALLGATHERV;
817   extra->send_size = sendcount;
818   extra->recvcounts= xbt_new(int, comm_size);
819   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
820     extra->recvcounts[i] = recvcounts[i];
821   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
822   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
823   extra->num_processes = comm_size;
824
825   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
826
827   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
828                           MPI_COMM_WORLD);
829
830   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
831   log_timed_action (action, clock);
832 }
833
834 static void action_allToAllv(const char *const *action) {
835   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
836         0 allToAllV 100 1 7 10 12 100 1 70 10 5
837      where:
838         1) 100 is the size of the send buffer *sizeof(int),
839         2) 1 7 10 12 is the sendcounts array
840         3) 100*sizeof(int) is the size of the receiver buffer
841         4)  1 70 10 5 is the recvcounts array
842   */
843   double clock = smpi_process_simulated_elapsed();
844
845   int comm_size = MPI_COMM_WORLD->size();
846   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
847   int sendcounts[comm_size];
848   int recvcounts[comm_size];
849   int senddisps[comm_size];
850   int recvdisps[comm_size];
851
852   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
853
854   int send_buf_size=parse_double(action[2]);
855   int recv_buf_size=parse_double(action[3+comm_size]);
856   if(action[4+2*comm_size] && action[5+2*comm_size]) {
857     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
858     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
859   }
860   else
861     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
862
863   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
864   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
865
866   for(int i=0;i<comm_size;i++) {
867     sendcounts[i] = atoi(action[i+3]);
868     recvcounts[i] = atoi(action[i+4+comm_size]);
869     senddisps[i] = 0;
870     recvdisps[i] = 0;
871   }
872
873   int rank = smpi_process_index();
874   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
875   extra->type = TRACING_ALLTOALLV;
876   extra->recvcounts= xbt_new(int, comm_size);
877   extra->sendcounts= xbt_new(int, comm_size);
878   extra->num_processes = comm_size;
879
880   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
881     extra->send_size += sendcounts[i];
882     extra->sendcounts[i] = sendcounts[i];
883     extra->recv_size += recvcounts[i];
884     extra->recvcounts[i] = recvcounts[i];
885   }
886   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
887   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
888
889   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
890
891   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
892                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
893
894   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
895   log_timed_action (action, clock);
896 }
897
898 void smpi_replay_run(int *argc, char***argv){
899   /* First initializes everything */
900   smpi_process_init(argc, argv);
901   smpi_process_mark_as_initialized();
902   smpi_process_set_replaying(true);
903
904   int rank = smpi_process_index();
905   TRACE_smpi_init(rank);
906   TRACE_smpi_computing_init(rank);
907   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
908   extra->type = TRACING_INIT;
909   char *operation =bprintf("%s_init",__FUNCTION__);
910   TRACE_smpi_collective_in(rank, -1, operation, extra);
911   TRACE_smpi_collective_out(rank, -1, operation);
912   xbt_free(operation);
913
914   if (_xbt_replay_action_init()==0) {
915     xbt_replay_action_register("init",       action_init);
916     xbt_replay_action_register("finalize",   action_finalize);
917     xbt_replay_action_register("comm_size",  action_comm_size);
918     xbt_replay_action_register("comm_split", action_comm_split);
919     xbt_replay_action_register("comm_dup",   action_comm_dup);
920     xbt_replay_action_register("send",       action_send);
921     xbt_replay_action_register("Isend",      action_Isend);
922     xbt_replay_action_register("recv",       action_recv);
923     xbt_replay_action_register("Irecv",      action_Irecv);
924     xbt_replay_action_register("test",       action_test);
925     xbt_replay_action_register("wait",       action_wait);
926     xbt_replay_action_register("waitAll",    action_waitall);
927     xbt_replay_action_register("barrier",    action_barrier);
928     xbt_replay_action_register("bcast",      action_bcast);
929     xbt_replay_action_register("reduce",     action_reduce);
930     xbt_replay_action_register("allReduce",  action_allReduce);
931     xbt_replay_action_register("allToAll",   action_allToAll);
932     xbt_replay_action_register("allToAllV",  action_allToAllv);
933     xbt_replay_action_register("gather",  action_gather);
934     xbt_replay_action_register("gatherV",  action_gatherv);
935     xbt_replay_action_register("allGather",  action_allgather);
936     xbt_replay_action_register("allGatherV",  action_allgatherv);
937     xbt_replay_action_register("reduceScatter",  action_reducescatter);
938     xbt_replay_action_register("compute",    action_compute);
939   }
940
941   //if we have a delayed start, sleep here.
942   if(*argc>2){
943     char *endptr;
944     double value = strtod((*argv)[2], &endptr);
945     if (*endptr != '\0')
946       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
947     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
948     smpi_execute_flops(value);
949   } else {
950     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
951     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
952     smpi_execute_flops(0.0);
953   }
954
955   /* Actually run the replay */
956   xbt_replay_action_runner(*argc, *argv);
957
958   /* and now, finalize everything */
959   /* One active process will stop. Decrease the counter*/
960   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
961   if (!get_reqq_self()->empty()){
962     unsigned int count_requests=get_reqq_self()->size();
963     MPI_Request requests[count_requests];
964     MPI_Status status[count_requests];
965     unsigned int i=0;
966
967     for (auto req: *get_reqq_self()){
968       requests[i] = req;
969       i++;
970     }
971     smpi_mpi_waitall(count_requests, requests, status);
972   }
973   delete get_reqq_self();
974   active_processes--;
975
976   if(active_processes==0){
977     /* Last process alive speaking: end the simulated timer */
978     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
979     _xbt_replay_action_exit();
980     xbt_free(sendbuffer);
981     xbt_free(recvbuffer);
982   }
983
984   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
985   extra_fin->type = TRACING_FINALIZE;
986   operation =bprintf("%s_finalize",__FUNCTION__);
987   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
988
989   smpi_process_finalize();
990
991   TRACE_smpi_collective_out(rank, -1, operation);
992   TRACE_smpi_finalize(smpi_process_index());
993   smpi_process_destroy();
994   xbt_free(operation);
995 }