Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove dict and dynar
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11 #include <unordered_map>
12 #include <vector>
13
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
24
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
29
30 static void log_timed_action (const char *const *action, double clock){
31   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32     char *name = xbt_str_join_array(action, " ");
33     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34     xbt_free(name);
35   }
36 }
37
38 static std::vector<MPI_Request>* get_reqq_self()
39 {
40   return reqq.at(smpi_process_index());
41 }
42
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 {
45    reqq.insert({smpi_process_index(), mpi_request});
46 }
47
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
50 {
51   if (!smpi_process_get_replaying())
52     return xbt_malloc(size);
53   if (sendbuffer_size<size){
54     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
55     sendbuffer_size=size;
56   }
57   return sendbuffer;
58 }
59
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62   if (!smpi_process_get_replaying())
63   return xbt_malloc(size);
64   if (recvbuffer_size<size){
65     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
66     recvbuffer_size=size;
67   }
68   return recvbuffer;
69 }
70
71 void smpi_free_tmp_buffer(void* buf){
72   if (!smpi_process_get_replaying())
73     xbt_free(buf);
74 }
75
76 /* Helper function */
77 static double parse_double(const char *string)
78 {
79   double value;
80   char *endptr;
81   value = strtod(string, &endptr);
82   if (*endptr != '\0')
83     THROWF(unknown_error, 0, "%s is not a double", string);
84   return value;
85 }
86
87 static MPI_Datatype decode_datatype(const char *const action)
88 {
89 // Declared datatypes,
90   switch(atoi(action)) {
91     case 0:
92       MPI_CURRENT_TYPE=MPI_DOUBLE;
93       break;
94     case 1:
95       MPI_CURRENT_TYPE=MPI_INT;
96       break;
97     case 2:
98       MPI_CURRENT_TYPE=MPI_CHAR;
99       break;
100     case 3:
101       MPI_CURRENT_TYPE=MPI_SHORT;
102       break;
103     case 4:
104       MPI_CURRENT_TYPE=MPI_LONG;
105       break;
106     case 5:
107       MPI_CURRENT_TYPE=MPI_FLOAT;
108       break;
109     case 6:
110       MPI_CURRENT_TYPE=MPI_BYTE;
111       break;
112     default:
113       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
114   }
115    return MPI_CURRENT_TYPE;
116 }
117
118
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
120 {
121   //default type for output is set to MPI_BYTE
122   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
123   if(known!=nullptr)
124     *known=1;
125   if (datatype==MPI_BYTE){
126       return "";
127   }
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
141   if(known!=nullptr)
142     *known=0;
143   // default - not implemented.
144   // do not warn here as we pass in this function even for other trace formats
145   return "-1";
146 }
147
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149     int i=0;\
150     while(action[i]!=nullptr)\
151      i++;\
152     if(i<mandatory+2)                                           \
153     THROWF(arg_error, 0, "%s replay failed.\n" \
154           "%d items were given on the line. First two should be process_id and action.  " \
155           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157   }
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2]) 
164     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
165   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process_simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* do nothing */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, clock);
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   double clock = smpi_process_simulated_elapsed();
191
192   log_timed_action (action, clock);
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   double clock = smpi_process_simulated_elapsed();
198
199   log_timed_action (action, clock);
200 }
201
202 static void action_compute(const char *const *action)
203 {
204   CHECK_ACTION_PARAMS(action, 1, 0)
205   double clock = smpi_process_simulated_elapsed();
206   double flops= parse_double(action[2]);
207   int rank = smpi_process_index();
208   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209   extra->type=TRACING_COMPUTING;
210   extra->comp_size=flops;
211   TRACE_smpi_computing_in(rank, extra);
212
213   smpi_execute_flops(flops);
214
215   TRACE_smpi_computing_out(rank);
216   log_timed_action (action, clock);
217 }
218
219 static void action_send(const char *const *action)
220 {
221   CHECK_ACTION_PARAMS(action, 2, 1)
222   int to = atoi(action[2]);
223   double size=parse_double(action[3]);
224   double clock = smpi_process_simulated_elapsed();
225
226   if(action[4])
227     MPI_CURRENT_TYPE=decode_datatype(action[4]);
228   else
229     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
230
231   int rank = smpi_process_index();
232
233   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235   extra->type = TRACING_SEND;
236   extra->send_size = size;
237   extra->src = rank;
238   extra->dst = dst_traced;
239   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241   if (!TRACE_smpi_view_internals()) {
242     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243   }
244
245   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
246
247   log_timed_action (action, clock);
248
249   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
250 }
251
252 static void action_Isend(const char *const *action)
253 {
254   CHECK_ACTION_PARAMS(action, 2, 1)
255   int to = atoi(action[2]);
256   double size=parse_double(action[3]);
257   double clock = smpi_process_simulated_elapsed();
258   MPI_Request request;
259
260   if(action[4]) 
261     MPI_CURRENT_TYPE=decode_datatype(action[4]);
262   else 
263     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
264
265   int rank = smpi_process_index();
266   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268   extra->type = TRACING_ISEND;
269   extra->send_size = size;
270   extra->src = rank;
271   extra->dst = dst_traced;
272   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274   if (!TRACE_smpi_view_internals()) {
275     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
276   }
277
278   request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
279
280   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
281   request->send = 1;
282
283   get_reqq_self()->push_back(request);
284
285   log_timed_action (action, clock);
286 }
287
288 static void action_recv(const char *const *action) {
289   CHECK_ACTION_PARAMS(action, 2, 1)
290   int from = atoi(action[2]);
291   double size=parse_double(action[3]);
292   double clock = smpi_process_simulated_elapsed();
293   MPI_Status status;
294
295   if(action[4]) 
296     MPI_CURRENT_TYPE=decode_datatype(action[4]);
297   else 
298     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
299
300   int rank = smpi_process_index();
301   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
302
303   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304   extra->type = TRACING_RECV;
305   extra->send_size = size;
306   extra->src = src_traced;
307   extra->dst = rank;
308   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
310
311   //unknow size from the receiver pov
312   if(size<=0.0){
313       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
314       size=status.count;
315   }
316
317   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
318
319   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320   if (!TRACE_smpi_view_internals()) {
321     TRACE_smpi_recv(rank, src_traced, rank);
322   }
323
324   log_timed_action (action, clock);
325 }
326
327 static void action_Irecv(const char *const *action)
328 {
329   CHECK_ACTION_PARAMS(action, 2, 1)
330   int from = atoi(action[2]);
331   double size=parse_double(action[3]);
332   double clock = smpi_process_simulated_elapsed();
333   MPI_Request request;
334
335   if(action[4]) 
336     MPI_CURRENT_TYPE=decode_datatype(action[4]);
337   else 
338     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
339
340   int rank = smpi_process_index();
341   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343   extra->type = TRACING_IRECV;
344   extra->send_size = size;
345   extra->src = src_traced;
346   extra->dst = rank;
347   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
349   MPI_Status status;
350   //unknow size from the receiver pov
351   if(size<=0.0){
352       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
353       size=status.count;
354   }
355
356   request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
357
358   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
359   request->recv = 1;
360   get_reqq_self()->push_back(request);
361
362   log_timed_action (action, clock);
363 }
364
365 static void action_test(const char *const *action){
366   CHECK_ACTION_PARAMS(action, 0, 0)
367   double clock = smpi_process_simulated_elapsed();
368   MPI_Status status;
369   int flag = true;
370
371   MPI_Request request = get_reqq_self()->back();
372   get_reqq_self()->pop_back();
373   //if request is null here, this may mean that a previous test has succeeded 
374   //Different times in traced application and replayed version may lead to this 
375   //In this case, ignore the extra calls.
376   if(request!=nullptr){
377     int rank = smpi_process_index();
378     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379     extra->type=TRACING_TEST;
380     TRACE_smpi_testing_in(rank, extra);
381
382     flag = smpi_mpi_test(&request, &status);
383
384     XBT_DEBUG("MPI_Test result: %d", flag);
385     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386     get_reqq_self()->push_back(request);
387
388     TRACE_smpi_testing_out(rank);
389   }
390   log_timed_action (action, clock);
391 }
392
393 static void action_wait(const char *const *action){
394   CHECK_ACTION_PARAMS(action, 0, 0)
395   double clock = smpi_process_simulated_elapsed();
396   MPI_Status status;
397
398   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399       xbt_str_join_array(action," "));
400   MPI_Request request = get_reqq_self()->back();
401   get_reqq_self()->pop_back();
402
403   if (request==nullptr){
404     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
405     return;
406   }
407
408   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
409
410   MPI_Group group = smpi_comm_group(request->comm);
411   int src_traced = smpi_group_rank(group, request->src);
412   int dst_traced = smpi_group_rank(group, request->dst);
413   int is_wait_for_receive = request->recv;
414   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415   extra->type = TRACING_WAIT;
416   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
417
418   smpi_mpi_wait(&request, &status);
419
420   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421   if (is_wait_for_receive)
422     TRACE_smpi_recv(rank, src_traced, dst_traced);
423   log_timed_action (action, clock);
424 }
425
426 static void action_waitall(const char *const *action){
427   CHECK_ACTION_PARAMS(action, 0, 0)
428   double clock = smpi_process_simulated_elapsed();
429   unsigned int count_requests=get_reqq_self()->size();
430
431   if (count_requests>0) {
432     MPI_Status status[count_requests];
433
434    int rank_traced = smpi_process_index();
435    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436    extra->type = TRACING_WAITALL;
437    extra->send_size=count_requests;
438    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
439
440    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
441
442    for (auto req : *(get_reqq_self())){
443      if (req && req->recv)
444        TRACE_smpi_recv(rank_traced, req->src, req->dst);
445    }
446    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
447    set_reqq_self(new std::vector<MPI_Request>);
448   }
449   log_timed_action (action, clock);
450 }
451
452 static void action_barrier(const char *const *action){
453   double clock = smpi_process_simulated_elapsed();
454   int rank = smpi_process_index();
455   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
456   extra->type = TRACING_BARRIER;
457   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
458
459   mpi_coll_barrier_fun(MPI_COMM_WORLD);
460
461   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
462   log_timed_action (action, clock);
463 }
464
465 static void action_bcast(const char *const *action)
466 {
467   CHECK_ACTION_PARAMS(action, 1, 2)
468   double size = parse_double(action[2]);
469   double clock = smpi_process_simulated_elapsed();
470   int root=0;
471   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
472   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
473
474   if(action[3]) {
475     root= atoi(action[3]);
476     if(action[4]) {
477       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
478     }
479   }
480
481   int rank = smpi_process_index();
482   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
483
484   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
485   extra->type = TRACING_BCAST;
486   extra->send_size = size;
487   extra->root = root_traced;
488   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
489   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
490   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
491
492   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
493
494   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
495   log_timed_action (action, clock);
496 }
497
498 static void action_reduce(const char *const *action)
499 {
500   CHECK_ACTION_PARAMS(action, 2, 2)
501   double comm_size = parse_double(action[2]);
502   double comp_size = parse_double(action[3]);
503   double clock = smpi_process_simulated_elapsed();
504   int root=0;
505   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
506
507   if(action[4]) {
508     root= atoi(action[4]);
509     if(action[5]) {
510       MPI_CURRENT_TYPE=decode_datatype(action[5]);
511     }
512   }
513
514   int rank = smpi_process_index();
515   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
516   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
517   extra->type = TRACING_REDUCE;
518   extra->send_size = comm_size;
519   extra->comp_size = comp_size;
520   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
521   extra->root = root_traced;
522
523   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
524
525   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
526   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
527   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
528   smpi_execute_flops(comp_size);
529
530   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
531   log_timed_action (action, clock);
532 }
533
534 static void action_allReduce(const char *const *action) {
535   CHECK_ACTION_PARAMS(action, 2, 1)
536   double comm_size = parse_double(action[2]);
537   double comp_size = parse_double(action[3]);
538
539   if(action[4])
540     MPI_CURRENT_TYPE=decode_datatype(action[4]);
541   else
542     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
543
544   double clock = smpi_process_simulated_elapsed();
545   int rank = smpi_process_index();
546   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
547   extra->type = TRACING_ALLREDUCE;
548   extra->send_size = comm_size;
549   extra->comp_size = comp_size;
550   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
551   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
552
553   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
554   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
555   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
556   smpi_execute_flops(comp_size);
557
558   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
559   log_timed_action (action, clock);
560 }
561
562 static void action_allToAll(const char *const *action) {
563   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
564                                      //two optional (corresponding datatypes)
565   double clock = smpi_process_simulated_elapsed();
566   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
567   int send_size = parse_double(action[2]);
568   int recv_size = parse_double(action[3]);
569   MPI_Datatype MPI_CURRENT_TYPE2;
570
571   if(action[4] && action[5]) {
572     MPI_CURRENT_TYPE=decode_datatype(action[4]);
573     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
574   }
575   else{
576     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
577     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
578   }
579
580   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
581   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
582
583   int rank = smpi_process_index();
584   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585   extra->type = TRACING_ALLTOALL;
586   extra->send_size = send_size;
587   extra->recv_size = recv_size;
588   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
589   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
590
591   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
592
593   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
594
595   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
596   log_timed_action (action, clock);
597 }
598
599 static void action_gather(const char *const *action) {
600   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
601  0 gather 68 68 0 0 0
602
603   where: 
604   1) 68 is the sendcounts
605   2) 68 is the recvcounts
606   3) 0 is the root node
607   4) 0 is the send datatype id, see decode_datatype()
608   5) 0 is the recv datatype id, see decode_datatype()
609   */
610   CHECK_ACTION_PARAMS(action, 2, 3)
611   double clock = smpi_process_simulated_elapsed();
612   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
613   int send_size = parse_double(action[2]);
614   int recv_size = parse_double(action[3]);
615   MPI_Datatype MPI_CURRENT_TYPE2;
616   if(action[4] && action[5]) {
617     MPI_CURRENT_TYPE=decode_datatype(action[5]);
618     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
619   } else {
620     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
621     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
622   }
623   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624   void *recv = nullptr;
625   int root=0;
626   if(action[4])
627     root=atoi(action[4]);
628   int rank = smpi_comm_rank(MPI_COMM_WORLD);
629
630   if(rank==root)
631     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
632
633   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
634   extra->type = TRACING_GATHER;
635   extra->send_size = send_size;
636   extra->recv_size = recv_size;
637   extra->root = root;
638   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
639   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
640
641   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
642
643   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
644
645   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
646   log_timed_action (action, clock);
647 }
648
649 static void action_gatherv(const char *const *action) {
650   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
651  0 gather 68 68 10 10 10 0 0 0
652
653   where:
654   1) 68 is the sendcount
655   2) 68 10 10 10 is the recvcounts
656   3) 0 is the root node
657   4) 0 is the send datatype id, see decode_datatype()
658   5) 0 is the recv datatype id, see decode_datatype()
659   */
660
661   double clock = smpi_process_simulated_elapsed();
662   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
663   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
664   int send_size = parse_double(action[2]);
665   int *disps = xbt_new0(int, comm_size);
666   int *recvcounts = xbt_new0(int, comm_size);
667   int i=0,recv_sum=0;
668
669   MPI_Datatype MPI_CURRENT_TYPE2;
670   if(action[4+comm_size] && action[5+comm_size]) {
671     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
672     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
673   } else {
674     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
675     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
676   }
677   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
678   void *recv = nullptr;
679   for(i=0;i<comm_size;i++) {
680     recvcounts[i] = atoi(action[i+3]);
681     recv_sum=recv_sum+recvcounts[i];
682     disps[i] = 0;
683   }
684
685   int root=atoi(action[3+comm_size]);
686   int rank = smpi_comm_rank(MPI_COMM_WORLD);
687
688   if(rank==root)
689     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
690
691   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
692   extra->type = TRACING_GATHERV;
693   extra->send_size = send_size;
694   extra->recvcounts= xbt_new(int,comm_size);
695   for(i=0; i< comm_size; i++)//copy data to avoid bad free
696     extra->recvcounts[i] = recvcounts[i];
697   extra->root = root;
698   extra->num_processes = comm_size;
699   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
700   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
701
702   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
703
704   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
705
706   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
707   log_timed_action (action, clock);
708   xbt_free(recvcounts);
709   xbt_free(disps);
710 }
711
712 static void action_reducescatter(const char *const *action) {
713  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
714 0 reduceScatter 275427 275427 275427 204020 11346849 0
715
716   where: 
717   1) The first four values after the name of the action declare the recvcounts array
718   2) The value 11346849 is the amount of instructions
719   3) The last value corresponds to the datatype, see decode_datatype().
720
721   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
722   double clock = smpi_process_simulated_elapsed();
723   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
724   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
725   int comp_size = parse_double(action[2+comm_size]);
726   int *recvcounts = xbt_new0(int, comm_size);  
727   int *disps = xbt_new0(int, comm_size);  
728   int i=0;
729   int rank = smpi_process_index();
730   int size = 0;
731   if(action[3+comm_size])
732     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
733   else
734     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
735
736   for(i=0;i<comm_size;i++) {
737     recvcounts[i] = atoi(action[i+2]);
738     disps[i] = 0;
739     size+=recvcounts[i];
740   }
741
742   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
743   extra->type = TRACING_REDUCE_SCATTER;
744   extra->send_size = 0;
745   extra->recvcounts= xbt_new(int, comm_size);
746   for(i=0; i< comm_size; i++)//copy data to avoid bad free
747     extra->recvcounts[i] = recvcounts[i];
748   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
749   extra->comp_size = comp_size;
750   extra->num_processes = comm_size;
751
752   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
753
754   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
755   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
756    
757    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
758    smpi_execute_flops(comp_size);
759
760   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
761   xbt_free(recvcounts);
762   xbt_free(disps);
763   log_timed_action (action, clock);
764 }
765
766 static void action_allgather(const char *const *action) {
767   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
768   0 allGather 275427 275427
769
770   where: 
771   1) 275427 is the sendcount
772   2) 275427 is the recvcount
773   3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().  */
774   double clock = smpi_process_simulated_elapsed();
775
776   CHECK_ACTION_PARAMS(action, 2, 2)
777   int sendcount=atoi(action[2]); 
778   int recvcount=atoi(action[3]); 
779
780   MPI_Datatype MPI_CURRENT_TYPE2;
781
782   if(action[4] && action[5]) {
783     MPI_CURRENT_TYPE = decode_datatype(action[4]);
784     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
785   } else {
786     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
787     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
788   }
789   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
790   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
791
792   int rank = smpi_process_index();
793   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
794   extra->type = TRACING_ALLGATHER;
795   extra->send_size = sendcount;
796   extra->recv_size= recvcount;
797   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
798   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
799   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
800
801   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
802
803   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
804
805   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
806   log_timed_action (action, clock);
807 }
808
809 static void action_allgatherv(const char *const *action) {
810   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
811 0 allGatherV 275427 275427 275427 275427 204020
812
813   where: 
814   1) 275427 is the sendcount
815   2) The next four elements declare the recvcounts array
816   3) No more values mean that the datatype for sent and receive buffer
817   is the default one, see decode_datatype(). */
818   double clock = smpi_process_simulated_elapsed();
819
820   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
821   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
822   int i=0;
823   int sendcount=atoi(action[2]);
824   int *recvcounts = xbt_new0(int, comm_size);  
825   int *disps = xbt_new0(int, comm_size);  
826   int recv_sum=0;  
827   MPI_Datatype MPI_CURRENT_TYPE2;
828
829   if(action[3+comm_size] && action[4+comm_size]) {
830     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
831     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
832   } else {
833     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
834     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
835   }
836   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
837
838   for(i=0;i<comm_size;i++) {
839     recvcounts[i] = atoi(action[i+3]);
840     recv_sum=recv_sum+recvcounts[i];
841   }
842   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
843
844   int rank = smpi_process_index();
845   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
846   extra->type = TRACING_ALLGATHERV;
847   extra->send_size = sendcount;
848   extra->recvcounts= xbt_new(int, comm_size);
849   for(i=0; i< comm_size; i++)//copy data to avoid bad free
850     extra->recvcounts[i] = recvcounts[i];
851   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
852   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
853   extra->num_processes = comm_size;
854
855   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
856
857   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
858                           MPI_COMM_WORLD);
859
860   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
861   log_timed_action (action, clock);
862   xbt_free(recvcounts);
863   xbt_free(disps);
864 }
865
866 static void action_allToAllv(const char *const *action) {
867   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
868   0 allToAllV 100 1 7 10 12 100 1 70 10 5
869
870   where: 
871   1) 100 is the size of the send buffer *sizeof(int),
872   2) 1 7 10 12 is the sendcounts array
873   3) 100*sizeof(int) is the size of the receiver buffer
874   4)  1 70 10 5 is the recvcounts array */
875   double clock = smpi_process_simulated_elapsed();
876
877   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
878   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
879   int send_buf_size=0,recv_buf_size=0,i=0;
880   int *sendcounts = xbt_new0(int, comm_size);  
881   int *recvcounts = xbt_new0(int, comm_size);  
882   int *senddisps = xbt_new0(int, comm_size);  
883   int *recvdisps = xbt_new0(int, comm_size);  
884
885   MPI_Datatype MPI_CURRENT_TYPE2;
886
887   send_buf_size=parse_double(action[2]);
888   recv_buf_size=parse_double(action[3+comm_size]);
889   if(action[4+2*comm_size] && action[5+2*comm_size]) {
890     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
891     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
892   }
893   else{
894     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
895     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
896   }
897
898   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
899   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
900
901   for(i=0;i<comm_size;i++) {
902     sendcounts[i] = atoi(action[i+3]);
903     recvcounts[i] = atoi(action[i+4+comm_size]);
904   }
905
906   int rank = smpi_process_index();
907   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
908   extra->type = TRACING_ALLTOALLV;
909   extra->recvcounts= xbt_new(int, comm_size);
910   extra->sendcounts= xbt_new(int, comm_size);
911   extra->num_processes = comm_size;
912
913   for(i=0; i< comm_size; i++){//copy data to avoid bad free
914     extra->send_size += sendcounts[i];
915     extra->sendcounts[i] = sendcounts[i];
916     extra->recv_size += recvcounts[i];
917     extra->recvcounts[i] = recvcounts[i];
918   }
919   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
920   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
921
922   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
923
924   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
925                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
926
927   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
928   log_timed_action (action, clock);
929   xbt_free(sendcounts);
930   xbt_free(recvcounts);
931   xbt_free(senddisps);
932   xbt_free(recvdisps);
933 }
934
935 void smpi_replay_run(int *argc, char***argv){
936   /* First initializes everything */
937   smpi_process_init(argc, argv);
938   smpi_process_mark_as_initialized();
939   smpi_process_set_replaying(true);
940
941   int rank = smpi_process_index();
942   TRACE_smpi_init(rank);
943   TRACE_smpi_computing_init(rank);
944   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
945   extra->type = TRACING_INIT;
946   char *operation =bprintf("%s_init",__FUNCTION__);
947   TRACE_smpi_collective_in(rank, -1, operation, extra);
948   TRACE_smpi_collective_out(rank, -1, operation);
949   xbt_free(operation);
950
951   if (_xbt_replay_action_init()==0) {
952     xbt_replay_action_register("init",       action_init);
953     xbt_replay_action_register("finalize",   action_finalize);
954     xbt_replay_action_register("comm_size",  action_comm_size);
955     xbt_replay_action_register("comm_split", action_comm_split);
956     xbt_replay_action_register("comm_dup",   action_comm_dup);
957     xbt_replay_action_register("send",       action_send);
958     xbt_replay_action_register("Isend",      action_Isend);
959     xbt_replay_action_register("recv",       action_recv);
960     xbt_replay_action_register("Irecv",      action_Irecv);
961     xbt_replay_action_register("test",       action_test);
962     xbt_replay_action_register("wait",       action_wait);
963     xbt_replay_action_register("waitAll",    action_waitall);
964     xbt_replay_action_register("barrier",    action_barrier);
965     xbt_replay_action_register("bcast",      action_bcast);
966     xbt_replay_action_register("reduce",     action_reduce);
967     xbt_replay_action_register("allReduce",  action_allReduce);
968     xbt_replay_action_register("allToAll",   action_allToAll);
969     xbt_replay_action_register("allToAllV",  action_allToAllv);
970     xbt_replay_action_register("gather",  action_gather);
971     xbt_replay_action_register("gatherV",  action_gatherv);
972     xbt_replay_action_register("allGather",  action_allgather);
973     xbt_replay_action_register("allGatherV",  action_allgatherv);
974     xbt_replay_action_register("reduceScatter",  action_reducescatter);
975     xbt_replay_action_register("compute",    action_compute);
976   }
977
978   //if we have a delayed start, sleep here.
979   if(*argc>2){
980     char *endptr;
981     double value = strtod((*argv)[2], &endptr);
982     if (*endptr != '\0')
983       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
984     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
985     smpi_execute_flops(value);
986   } else {
987     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
988     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
989     smpi_execute_flops(0.0);
990   }
991
992   /* Actually run the replay */
993   xbt_replay_action_runner(*argc, *argv);
994
995   /* and now, finalize everything */
996   double sim_time= 1.;
997   /* One active process will stop. Decrease the counter*/
998   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
999   if (!get_reqq_self()->empty()){
1000     unsigned int count_requests=get_reqq_self()->size();
1001     MPI_Request requests[count_requests];
1002     MPI_Status status[count_requests];
1003     unsigned int i=0;
1004
1005     for (auto req: *get_reqq_self()){
1006       requests[i] = req;
1007       i++;
1008     }
1009     smpi_mpi_waitall(count_requests, requests, status);
1010     active_processes--;
1011   } else {
1012     active_processes--;
1013   }
1014
1015   if(active_processes==0){
1016     /* Last process alive speaking */
1017     /* end the simulated timer */
1018     sim_time = smpi_process_simulated_elapsed();
1019     XBT_INFO("Simulation time %f", sim_time);
1020     _xbt_replay_action_exit();
1021     xbt_free(sendbuffer);
1022     xbt_free(recvbuffer);
1023   }
1024
1025   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1026   extra_fin->type = TRACING_FINALIZE;
1027   operation =bprintf("%s_finalize",__FUNCTION__);
1028   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1029
1030   smpi_process_finalize();
1031
1032   TRACE_smpi_collective_out(rank, -1, operation);
1033   TRACE_smpi_finalize(smpi_process_index());
1034   smpi_process_destroy();
1035   xbt_free(operation);
1036 }