Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix for [#136] on github.
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11 #include <unordered_map>
12 #include <vector>
13
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
24
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
29
30 static void log_timed_action (const char *const *action, double clock){
31   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32     char *name = xbt_str_join_array(action, " ");
33     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34     xbt_free(name);
35   }
36 }
37
38 static std::vector<MPI_Request>* get_reqq_self()
39 {
40   return reqq.at(smpi_process_index());
41 }
42
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 {
45    reqq.insert({smpi_process_index(), mpi_request});
46 }
47
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
50 {
51   if (!smpi_process_get_replaying())
52     return xbt_malloc(size);
53   if (sendbuffer_size<size){
54     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
55     sendbuffer_size=size;
56   }
57   return sendbuffer;
58 }
59
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62   if (!smpi_process_get_replaying())
63   return xbt_malloc(size);
64   if (recvbuffer_size<size){
65     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
66     recvbuffer_size=size;
67   }
68   return recvbuffer;
69 }
70
71 void smpi_free_tmp_buffer(void* buf){
72   if (!smpi_process_get_replaying())
73     xbt_free(buf);
74 }
75
76 /* Helper function */
77 static double parse_double(const char *string)
78 {
79   double value;
80   char *endptr;
81   value = strtod(string, &endptr);
82   if (*endptr != '\0')
83     THROWF(unknown_error, 0, "%s is not a double", string);
84   return value;
85 }
86
87 static MPI_Datatype decode_datatype(const char *const action)
88 {
89 // Declared datatypes,
90   switch(atoi(action)) {
91     case 0:
92       MPI_CURRENT_TYPE=MPI_DOUBLE;
93       break;
94     case 1:
95       MPI_CURRENT_TYPE=MPI_INT;
96       break;
97     case 2:
98       MPI_CURRENT_TYPE=MPI_CHAR;
99       break;
100     case 3:
101       MPI_CURRENT_TYPE=MPI_SHORT;
102       break;
103     case 4:
104       MPI_CURRENT_TYPE=MPI_LONG;
105       break;
106     case 5:
107       MPI_CURRENT_TYPE=MPI_FLOAT;
108       break;
109     case 6:
110       MPI_CURRENT_TYPE=MPI_BYTE;
111       break;
112     default:
113       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
114   }
115    return MPI_CURRENT_TYPE;
116 }
117
118
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
120 {
121   //default type for output is set to MPI_BYTE
122   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
123   if(known!=nullptr)
124     *known=1;
125   if (datatype==MPI_BYTE){
126       return "";
127   }
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
141   if(known!=nullptr)
142     *known=0;
143   // default - not implemented.
144   // do not warn here as we pass in this function even for other trace formats
145   return "-1";
146 }
147
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149     int i=0;\
150     while(action[i]!=nullptr)\
151      i++;\
152     if(i<mandatory+2)                                           \
153     THROWF(arg_error, 0, "%s replay failed.\n" \
154           "%d items were given on the line. First two should be process_id and action.  " \
155           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157   }
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2]) 
164     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
165   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process_simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* do nothing */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, clock);
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   double clock = smpi_process_simulated_elapsed();
191
192   log_timed_action (action, clock);
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   double clock = smpi_process_simulated_elapsed();
198
199   log_timed_action (action, clock);
200 }
201
202 static void action_compute(const char *const *action)
203 {
204   CHECK_ACTION_PARAMS(action, 1, 0)
205   double clock = smpi_process_simulated_elapsed();
206   double flops= parse_double(action[2]);
207   int rank = smpi_process_index();
208   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209   extra->type=TRACING_COMPUTING;
210   extra->comp_size=flops;
211   TRACE_smpi_computing_in(rank, extra);
212
213   smpi_execute_flops(flops);
214
215   TRACE_smpi_computing_out(rank);
216   log_timed_action (action, clock);
217 }
218
219 static void action_send(const char *const *action)
220 {
221   CHECK_ACTION_PARAMS(action, 2, 1)
222   int to = atoi(action[2]);
223   double size=parse_double(action[3]);
224   double clock = smpi_process_simulated_elapsed();
225
226   if(action[4])
227     MPI_CURRENT_TYPE=decode_datatype(action[4]);
228   else
229     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
230
231   int rank = smpi_process_index();
232
233   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235   extra->type = TRACING_SEND;
236   extra->send_size = size;
237   extra->src = rank;
238   extra->dst = dst_traced;
239   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241   if (!TRACE_smpi_view_internals()) {
242     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243   }
244
245   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
246
247   log_timed_action (action, clock);
248
249   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
250 }
251
252 static void action_Isend(const char *const *action)
253 {
254   CHECK_ACTION_PARAMS(action, 2, 1)
255   int to = atoi(action[2]);
256   double size=parse_double(action[3]);
257   double clock = smpi_process_simulated_elapsed();
258   MPI_Request request;
259
260   if(action[4]) 
261     MPI_CURRENT_TYPE=decode_datatype(action[4]);
262   else 
263     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
264
265   int rank = smpi_process_index();
266   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268   extra->type = TRACING_ISEND;
269   extra->send_size = size;
270   extra->src = rank;
271   extra->dst = dst_traced;
272   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274   if (!TRACE_smpi_view_internals()) {
275     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
276   }
277
278   request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
279
280   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
281   request->send = 1;
282
283   get_reqq_self()->push_back(request);
284
285   log_timed_action (action, clock);
286 }
287
288 static void action_recv(const char *const *action) {
289   CHECK_ACTION_PARAMS(action, 2, 1)
290   int from = atoi(action[2]);
291   double size=parse_double(action[3]);
292   double clock = smpi_process_simulated_elapsed();
293   MPI_Status status;
294
295   if(action[4]) 
296     MPI_CURRENT_TYPE=decode_datatype(action[4]);
297   else 
298     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
299
300   int rank = smpi_process_index();
301   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
302
303   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304   extra->type = TRACING_RECV;
305   extra->send_size = size;
306   extra->src = src_traced;
307   extra->dst = rank;
308   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
310
311   //unknown size from the receiver point of view
312   if(size<=0.0){
313     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
314     size=status.count;
315   }
316
317   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
318
319   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320   if (!TRACE_smpi_view_internals()) {
321     TRACE_smpi_recv(rank, src_traced, rank, 0);
322   }
323
324   log_timed_action (action, clock);
325 }
326
327 static void action_Irecv(const char *const *action)
328 {
329   CHECK_ACTION_PARAMS(action, 2, 1)
330   int from = atoi(action[2]);
331   double size=parse_double(action[3]);
332   double clock = smpi_process_simulated_elapsed();
333   MPI_Request request;
334
335   if(action[4]) 
336     MPI_CURRENT_TYPE=decode_datatype(action[4]);
337   else 
338     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
339
340   int rank = smpi_process_index();
341   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343   extra->type = TRACING_IRECV;
344   extra->send_size = size;
345   extra->src = src_traced;
346   extra->dst = rank;
347   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
349   MPI_Status status;
350   //unknow size from the receiver pov
351   if(size<=0.0){
352       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
353       size=status.count;
354   }
355
356   request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
357
358   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
359   request->recv = 1;
360   get_reqq_self()->push_back(request);
361
362   log_timed_action (action, clock);
363 }
364
365 static void action_test(const char *const *action){
366   CHECK_ACTION_PARAMS(action, 0, 0)
367   double clock = smpi_process_simulated_elapsed();
368   MPI_Status status;
369   int flag = true;
370
371   MPI_Request request = get_reqq_self()->back();
372   get_reqq_self()->pop_back();
373   //if request is null here, this may mean that a previous test has succeeded 
374   //Different times in traced application and replayed version may lead to this 
375   //In this case, ignore the extra calls.
376   if(request!=nullptr){
377     int rank = smpi_process_index();
378     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379     extra->type=TRACING_TEST;
380     TRACE_smpi_testing_in(rank, extra);
381
382     flag = smpi_mpi_test(&request, &status);
383
384     XBT_DEBUG("MPI_Test result: %d", flag);
385     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386     get_reqq_self()->push_back(request);
387
388     TRACE_smpi_testing_out(rank);
389   }
390   log_timed_action (action, clock);
391 }
392
393 static void action_wait(const char *const *action){
394   CHECK_ACTION_PARAMS(action, 0, 0)
395   double clock = smpi_process_simulated_elapsed();
396   MPI_Status status;
397
398   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399       xbt_str_join_array(action," "));
400   MPI_Request request = get_reqq_self()->back();
401   get_reqq_self()->pop_back();
402
403   if (request==nullptr){
404     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
405     return;
406   }
407
408   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
409
410   MPI_Group group = smpi_comm_group(request->comm);
411   int src_traced = smpi_group_rank(group, request->src);
412   int dst_traced = smpi_group_rank(group, request->dst);
413   int is_wait_for_receive = request->recv;
414   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415   extra->type = TRACING_WAIT;
416   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
417
418   smpi_mpi_wait(&request, &status);
419
420   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421   if (is_wait_for_receive)
422     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
423   log_timed_action (action, clock);
424 }
425
426 static void action_waitall(const char *const *action){
427   CHECK_ACTION_PARAMS(action, 0, 0)
428   double clock = smpi_process_simulated_elapsed();
429   unsigned int count_requests=get_reqq_self()->size();
430
431   if (count_requests>0) {
432     MPI_Status status[count_requests];
433
434    int rank_traced = smpi_process_index();
435    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436    extra->type = TRACING_WAITALL;
437    extra->send_size=count_requests;
438    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
439    int* recvs_snd= xbt_new0(int,count_requests);
440    int* recvs_rcv= xbt_new0(int,count_requests);
441    unsigned int i=0;
442    for (auto req : *(get_reqq_self())){
443      if (req && req->recv){
444        recvs_snd[i]=req->src;
445        recvs_rcv[i]=req->dst;
446      }else
447        recvs_snd[i]=-100;
448      i++;
449    }
450    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
451
452    for (i=0; i<count_requests;i++){
453      if (recvs_snd[i]!=-100)
454        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
455    }
456    xbt_free(recvs_rcv);
457    xbt_free(recvs_snd);
458    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
459   }
460   log_timed_action (action, clock);
461 }
462
463 static void action_barrier(const char *const *action){
464   double clock = smpi_process_simulated_elapsed();
465   int rank = smpi_process_index();
466   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
467   extra->type = TRACING_BARRIER;
468   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
469
470   mpi_coll_barrier_fun(MPI_COMM_WORLD);
471
472   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
473   log_timed_action (action, clock);
474 }
475
476 static void action_bcast(const char *const *action)
477 {
478   CHECK_ACTION_PARAMS(action, 1, 2)
479   double size = parse_double(action[2]);
480   double clock = smpi_process_simulated_elapsed();
481   int root=0;
482   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
483   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
484
485   if(action[3]) {
486     root= atoi(action[3]);
487     if(action[4]) {
488       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
489     }
490   }
491
492   int rank = smpi_process_index();
493   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
494
495   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496   extra->type = TRACING_BCAST;
497   extra->send_size = size;
498   extra->root = root_traced;
499   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
500   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
501   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
502
503   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
504
505   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
506   log_timed_action (action, clock);
507 }
508
509 static void action_reduce(const char *const *action)
510 {
511   CHECK_ACTION_PARAMS(action, 2, 2)
512   double comm_size = parse_double(action[2]);
513   double comp_size = parse_double(action[3]);
514   double clock = smpi_process_simulated_elapsed();
515   int root=0;
516   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
517
518   if(action[4]) {
519     root= atoi(action[4]);
520     if(action[5]) {
521       MPI_CURRENT_TYPE=decode_datatype(action[5]);
522     }
523   }
524
525   int rank = smpi_process_index();
526   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
527   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528   extra->type = TRACING_REDUCE;
529   extra->send_size = comm_size;
530   extra->comp_size = comp_size;
531   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
532   extra->root = root_traced;
533
534   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
535
536   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
537   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
538   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
539   smpi_execute_flops(comp_size);
540
541   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
542   log_timed_action (action, clock);
543 }
544
545 static void action_allReduce(const char *const *action) {
546   CHECK_ACTION_PARAMS(action, 2, 1)
547   double comm_size = parse_double(action[2]);
548   double comp_size = parse_double(action[3]);
549
550   if(action[4])
551     MPI_CURRENT_TYPE=decode_datatype(action[4]);
552   else
553     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
554
555   double clock = smpi_process_simulated_elapsed();
556   int rank = smpi_process_index();
557   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
558   extra->type = TRACING_ALLREDUCE;
559   extra->send_size = comm_size;
560   extra->comp_size = comp_size;
561   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
562   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
563
564   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
565   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
566   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
567   smpi_execute_flops(comp_size);
568
569   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
570   log_timed_action (action, clock);
571 }
572
573 static void action_allToAll(const char *const *action) {
574   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
575                                      //two optional (corresponding datatypes)
576   double clock = smpi_process_simulated_elapsed();
577   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
578   int send_size = parse_double(action[2]);
579   int recv_size = parse_double(action[3]);
580   MPI_Datatype MPI_CURRENT_TYPE2;
581
582   if(action[4] && action[5]) {
583     MPI_CURRENT_TYPE=decode_datatype(action[4]);
584     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
585   }
586   else{
587     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
588     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
589   }
590
591   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
592   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
593
594   int rank = smpi_process_index();
595   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
596   extra->type = TRACING_ALLTOALL;
597   extra->send_size = send_size;
598   extra->recv_size = recv_size;
599   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
600   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
601
602   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
603
604   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
605
606   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
607   log_timed_action (action, clock);
608 }
609
610 static void action_gather(const char *const *action) {
611   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
612  0 gather 68 68 0 0 0
613
614   where: 
615   1) 68 is the sendcounts
616   2) 68 is the recvcounts
617   3) 0 is the root node
618   4) 0 is the send datatype id, see decode_datatype()
619   5) 0 is the recv datatype id, see decode_datatype()
620   */
621   CHECK_ACTION_PARAMS(action, 2, 3)
622   double clock = smpi_process_simulated_elapsed();
623   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
624   int send_size = parse_double(action[2]);
625   int recv_size = parse_double(action[3]);
626   MPI_Datatype MPI_CURRENT_TYPE2;
627   if(action[4] && action[5]) {
628     MPI_CURRENT_TYPE=decode_datatype(action[5]);
629     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
630   } else {
631     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
633   }
634   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635   void *recv = nullptr;
636   int root=0;
637   if(action[4])
638     root=atoi(action[4]);
639   int rank = smpi_comm_rank(MPI_COMM_WORLD);
640
641   if(rank==root)
642     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
643
644   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
645   extra->type = TRACING_GATHER;
646   extra->send_size = send_size;
647   extra->recv_size = recv_size;
648   extra->root = root;
649   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
650   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
651
652   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
653
654   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
655
656   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
657   log_timed_action (action, clock);
658 }
659
660 static void action_gatherv(const char *const *action) {
661   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
662  0 gather 68 68 10 10 10 0 0 0
663
664   where:
665   1) 68 is the sendcount
666   2) 68 10 10 10 is the recvcounts
667   3) 0 is the root node
668   4) 0 is the send datatype id, see decode_datatype()
669   5) 0 is the recv datatype id, see decode_datatype()
670   */
671
672   double clock = smpi_process_simulated_elapsed();
673   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
675   int send_size = parse_double(action[2]);
676   int *disps = xbt_new0(int, comm_size);
677   int *recvcounts = xbt_new0(int, comm_size);
678   int i=0,recv_sum=0;
679
680   MPI_Datatype MPI_CURRENT_TYPE2;
681   if(action[4+comm_size] && action[5+comm_size]) {
682     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
683     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
684   } else {
685     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
686     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
687   }
688   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
689   void *recv = nullptr;
690   for(i=0;i<comm_size;i++) {
691     recvcounts[i] = atoi(action[i+3]);
692     recv_sum=recv_sum+recvcounts[i];
693     disps[i] = 0;
694   }
695
696   int root=atoi(action[3+comm_size]);
697   int rank = smpi_comm_rank(MPI_COMM_WORLD);
698
699   if(rank==root)
700     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
701
702   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
703   extra->type = TRACING_GATHERV;
704   extra->send_size = send_size;
705   extra->recvcounts= xbt_new(int,comm_size);
706   for(i=0; i< comm_size; i++)//copy data to avoid bad free
707     extra->recvcounts[i] = recvcounts[i];
708   extra->root = root;
709   extra->num_processes = comm_size;
710   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
711   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
712
713   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
714
715   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
716
717   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
718   log_timed_action (action, clock);
719   xbt_free(recvcounts);
720   xbt_free(disps);
721 }
722
723 static void action_reducescatter(const char *const *action) {
724  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
725 0 reduceScatter 275427 275427 275427 204020 11346849 0
726
727   where: 
728   1) The first four values after the name of the action declare the recvcounts array
729   2) The value 11346849 is the amount of instructions
730   3) The last value corresponds to the datatype, see decode_datatype().
731
732   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
733   double clock = smpi_process_simulated_elapsed();
734   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
735   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
736   int comp_size = parse_double(action[2+comm_size]);
737   int *recvcounts = xbt_new0(int, comm_size);  
738   int *disps = xbt_new0(int, comm_size);  
739   int i=0;
740   int rank = smpi_process_index();
741   int size = 0;
742   if(action[3+comm_size])
743     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
744   else
745     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
746
747   for(i=0;i<comm_size;i++) {
748     recvcounts[i] = atoi(action[i+2]);
749     disps[i] = 0;
750     size+=recvcounts[i];
751   }
752
753   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
754   extra->type = TRACING_REDUCE_SCATTER;
755   extra->send_size = 0;
756   extra->recvcounts= xbt_new(int, comm_size);
757   for(i=0; i< comm_size; i++)//copy data to avoid bad free
758     extra->recvcounts[i] = recvcounts[i];
759   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
760   extra->comp_size = comp_size;
761   extra->num_processes = comm_size;
762
763   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
764
765   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
766   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
767    
768    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
769    smpi_execute_flops(comp_size);
770
771   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
772   xbt_free(recvcounts);
773   xbt_free(disps);
774   log_timed_action (action, clock);
775 }
776
777 static void action_allgather(const char *const *action) {
778   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
779   0 allGather 275427 275427
780
781   where: 
782   1) 275427 is the sendcount
783   2) 275427 is the recvcount
784   3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().  */
785   double clock = smpi_process_simulated_elapsed();
786
787   CHECK_ACTION_PARAMS(action, 2, 2)
788   int sendcount=atoi(action[2]); 
789   int recvcount=atoi(action[3]); 
790
791   MPI_Datatype MPI_CURRENT_TYPE2;
792
793   if(action[4] && action[5]) {
794     MPI_CURRENT_TYPE = decode_datatype(action[4]);
795     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
796   } else {
797     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
798     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
799   }
800   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
801   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
802
803   int rank = smpi_process_index();
804   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
805   extra->type = TRACING_ALLGATHER;
806   extra->send_size = sendcount;
807   extra->recv_size= recvcount;
808   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
809   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
810   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
811
812   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
813
814   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
815
816   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
817   log_timed_action (action, clock);
818 }
819
820 static void action_allgatherv(const char *const *action) {
821   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
822 0 allGatherV 275427 275427 275427 275427 204020
823
824   where: 
825   1) 275427 is the sendcount
826   2) The next four elements declare the recvcounts array
827   3) No more values mean that the datatype for sent and receive buffer
828   is the default one, see decode_datatype(). */
829   double clock = smpi_process_simulated_elapsed();
830
831   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
832   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
833   int i=0;
834   int sendcount=atoi(action[2]);
835   int *recvcounts = xbt_new0(int, comm_size);  
836   int *disps = xbt_new0(int, comm_size);  
837   int recv_sum=0;  
838   MPI_Datatype MPI_CURRENT_TYPE2;
839
840   if(action[3+comm_size] && action[4+comm_size]) {
841     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
842     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
843   } else {
844     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
845     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
846   }
847   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
848
849   for(i=0;i<comm_size;i++) {
850     recvcounts[i] = atoi(action[i+3]);
851     recv_sum=recv_sum+recvcounts[i];
852   }
853   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
854
855   int rank = smpi_process_index();
856   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
857   extra->type = TRACING_ALLGATHERV;
858   extra->send_size = sendcount;
859   extra->recvcounts= xbt_new(int, comm_size);
860   for(i=0; i< comm_size; i++)//copy data to avoid bad free
861     extra->recvcounts[i] = recvcounts[i];
862   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
863   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
864   extra->num_processes = comm_size;
865
866   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
867
868   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
869                           MPI_COMM_WORLD);
870
871   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
872   log_timed_action (action, clock);
873   xbt_free(recvcounts);
874   xbt_free(disps);
875 }
876
877 static void action_allToAllv(const char *const *action) {
878   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
879   0 allToAllV 100 1 7 10 12 100 1 70 10 5
880
881   where: 
882   1) 100 is the size of the send buffer *sizeof(int),
883   2) 1 7 10 12 is the sendcounts array
884   3) 100*sizeof(int) is the size of the receiver buffer
885   4)  1 70 10 5 is the recvcounts array */
886   double clock = smpi_process_simulated_elapsed();
887
888   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
889   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
890   int *sendcounts = xbt_new0(int, comm_size);  
891   int *recvcounts = xbt_new0(int, comm_size);  
892   int *senddisps = xbt_new0(int, comm_size);  
893   int *recvdisps = xbt_new0(int, comm_size);  
894
895   MPI_Datatype MPI_CURRENT_TYPE2;
896
897   int send_buf_size=parse_double(action[2]);
898   int recv_buf_size=parse_double(action[3+comm_size]);
899   if(action[4+2*comm_size] && action[5+2*comm_size]) {
900     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
901     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
902   }
903   else{
904     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
905     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
906   }
907
908   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
909   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
910
911   for(int i=0;i<comm_size;i++) {
912     sendcounts[i] = atoi(action[i+3]);
913     recvcounts[i] = atoi(action[i+4+comm_size]);
914   }
915
916   int rank = smpi_process_index();
917   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
918   extra->type = TRACING_ALLTOALLV;
919   extra->recvcounts= xbt_new(int, comm_size);
920   extra->sendcounts= xbt_new(int, comm_size);
921   extra->num_processes = comm_size;
922
923   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
924     extra->send_size += sendcounts[i];
925     extra->sendcounts[i] = sendcounts[i];
926     extra->recv_size += recvcounts[i];
927     extra->recvcounts[i] = recvcounts[i];
928   }
929   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
930   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
931
932   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
933
934   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
935                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
936
937   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
938   log_timed_action (action, clock);
939   xbt_free(sendcounts);
940   xbt_free(recvcounts);
941   xbt_free(senddisps);
942   xbt_free(recvdisps);
943 }
944
945 void smpi_replay_run(int *argc, char***argv){
946   /* First initializes everything */
947   smpi_process_init(argc, argv);
948   smpi_process_mark_as_initialized();
949   smpi_process_set_replaying(true);
950
951   int rank = smpi_process_index();
952   TRACE_smpi_init(rank);
953   TRACE_smpi_computing_init(rank);
954   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
955   extra->type = TRACING_INIT;
956   char *operation =bprintf("%s_init",__FUNCTION__);
957   TRACE_smpi_collective_in(rank, -1, operation, extra);
958   TRACE_smpi_collective_out(rank, -1, operation);
959   xbt_free(operation);
960
961   if (_xbt_replay_action_init()==0) {
962     xbt_replay_action_register("init",       action_init);
963     xbt_replay_action_register("finalize",   action_finalize);
964     xbt_replay_action_register("comm_size",  action_comm_size);
965     xbt_replay_action_register("comm_split", action_comm_split);
966     xbt_replay_action_register("comm_dup",   action_comm_dup);
967     xbt_replay_action_register("send",       action_send);
968     xbt_replay_action_register("Isend",      action_Isend);
969     xbt_replay_action_register("recv",       action_recv);
970     xbt_replay_action_register("Irecv",      action_Irecv);
971     xbt_replay_action_register("test",       action_test);
972     xbt_replay_action_register("wait",       action_wait);
973     xbt_replay_action_register("waitAll",    action_waitall);
974     xbt_replay_action_register("barrier",    action_barrier);
975     xbt_replay_action_register("bcast",      action_bcast);
976     xbt_replay_action_register("reduce",     action_reduce);
977     xbt_replay_action_register("allReduce",  action_allReduce);
978     xbt_replay_action_register("allToAll",   action_allToAll);
979     xbt_replay_action_register("allToAllV",  action_allToAllv);
980     xbt_replay_action_register("gather",  action_gather);
981     xbt_replay_action_register("gatherV",  action_gatherv);
982     xbt_replay_action_register("allGather",  action_allgather);
983     xbt_replay_action_register("allGatherV",  action_allgatherv);
984     xbt_replay_action_register("reduceScatter",  action_reducescatter);
985     xbt_replay_action_register("compute",    action_compute);
986   }
987
988   //if we have a delayed start, sleep here.
989   if(*argc>2){
990     char *endptr;
991     double value = strtod((*argv)[2], &endptr);
992     if (*endptr != '\0')
993       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
994     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
995     smpi_execute_flops(value);
996   } else {
997     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
998     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
999     smpi_execute_flops(0.0);
1000   }
1001
1002   /* Actually run the replay */
1003   xbt_replay_action_runner(*argc, *argv);
1004
1005   /* and now, finalize everything */
1006   double sim_time= 1.;
1007   /* One active process will stop. Decrease the counter*/
1008   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
1009   if (!get_reqq_self()->empty()){
1010     unsigned int count_requests=get_reqq_self()->size();
1011     MPI_Request requests[count_requests];
1012     MPI_Status status[count_requests];
1013     unsigned int i=0;
1014
1015     for (auto req: *get_reqq_self()){
1016       requests[i] = req;
1017       i++;
1018     }
1019     smpi_mpi_waitall(count_requests, requests, status);
1020     active_processes--;
1021   } else {
1022     active_processes--;
1023   }
1024
1025   if(active_processes==0){
1026     /* Last process alive speaking */
1027     /* end the simulated timer */
1028     sim_time = smpi_process_simulated_elapsed();
1029     XBT_INFO("Simulation time %f", sim_time);
1030     _xbt_replay_action_exit();
1031     xbt_free(sendbuffer);
1032     xbt_free(recvbuffer);
1033   }
1034
1035   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1036   extra_fin->type = TRACING_FINALIZE;
1037   operation =bprintf("%s_finalize",__FUNCTION__);
1038   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1039
1040   smpi_process_finalize();
1041
1042   TRACE_smpi_collective_out(rank, -1, operation);
1043   TRACE_smpi_finalize(smpi_process_index());
1044   smpi_process_destroy();
1045   xbt_free(operation);
1046 }