Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some more files for sonar
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     xbt_free(name);
33   }
34 }
35
36 static xbt_dynar_t get_reqq_self()
37 {
38    char * key = bprintf("%d", smpi_process_index());
39    xbt_dynar_t dynar_mpi_request = static_cast<xbt_dynar_t>(xbt_dict_get(reqq, key));
40    xbt_free(key);
41  
42    return dynar_mpi_request;
43 }
44
45 static void set_reqq_self(xbt_dynar_t mpi_request)
46 {
47    char * key = bprintf("%d", smpi_process_index());
48    xbt_dict_set(reqq, key, mpi_request, free);
49    xbt_free(key);
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (!smpi_process_get_replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (!smpi_process_get_replaying())
67   return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (!smpi_process_get_replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   double value;
84   char *endptr;
85   value = strtod(string, &endptr);
86   if (*endptr != '\0')
87     THROWF(unknown_error, 0, "%s is not a double", string);
88   return value;
89 }
90
91 static MPI_Datatype decode_datatype(const char *const action)
92 {
93 // Declared datatypes,
94   switch(atoi(action)) {
95     case 0:
96       MPI_CURRENT_TYPE=MPI_DOUBLE;
97       break;
98     case 1:
99       MPI_CURRENT_TYPE=MPI_INT;
100       break;
101     case 2:
102       MPI_CURRENT_TYPE=MPI_CHAR;
103       break;
104     case 3:
105       MPI_CURRENT_TYPE=MPI_SHORT;
106       break;
107     case 4:
108       MPI_CURRENT_TYPE=MPI_LONG;
109       break;
110     case 5:
111       MPI_CURRENT_TYPE=MPI_FLOAT;
112       break;
113     case 6:
114       MPI_CURRENT_TYPE=MPI_BYTE;
115       break;
116     default:
117       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118   }
119    return MPI_CURRENT_TYPE;
120 }
121
122
123 const char* encode_datatype(MPI_Datatype datatype, int* known)
124 {
125   //default type for output is set to MPI_BYTE
126   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
127   if(known!=NULL)
128     *known=1;
129   if (datatype==MPI_BYTE){
130       return "";
131   }
132   if(datatype==MPI_DOUBLE)
133       return "0";
134   if(datatype==MPI_INT)
135       return "1";
136   if(datatype==MPI_CHAR)
137       return "2";
138   if(datatype==MPI_SHORT)
139       return "3";
140   if(datatype==MPI_LONG)
141     return "4";
142   if(datatype==MPI_FLOAT)
143       return "5";
144   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
145   if(known!=NULL)
146     *known=0;
147   // default - not implemented.
148   // do not warn here as we pass in this function even for other trace formats
149   return "-1";
150 }
151
152 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
153     int i=0;\
154     while(action[i]!=NULL)\
155      i++;\
156     if(i<mandatory+2)                                           \
157     THROWF(arg_error, 0, "%s replay failed.\n" \
158           "%d items were given on the line. First two should be process_id and action.  " \
159           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
160           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
161   }
162
163 static void action_init(const char *const *action)
164 {
165   XBT_DEBUG("Initialize the counters");
166   CHECK_ACTION_PARAMS(action, 0, 1)
167   if(action[2]) 
168     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
169   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
170
171   /* start a simulated timer */
172   smpi_process_simulated_start();
173   /*initialize the number of active processes */
174   active_processes = smpi_process_count();
175
176   if (reqq==NULL) {
177     reqq = xbt_dict_new();
178   }
179
180   set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
181 }
182
183 static void action_finalize(const char *const *action)
184 {
185   /* do nothing */
186 }
187
188 static void action_comm_size(const char *const *action)
189 {
190   double clock = smpi_process_simulated_elapsed();
191
192   communicator_size = parse_double(action[2]);
193   log_timed_action (action, clock);
194 }
195
196 static void action_comm_split(const char *const *action)
197 {
198   double clock = smpi_process_simulated_elapsed();
199
200   log_timed_action (action, clock);
201 }
202
203 static void action_comm_dup(const char *const *action)
204 {
205   double clock = smpi_process_simulated_elapsed();
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_compute(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 1, 0)
213   double clock = smpi_process_simulated_elapsed();
214   double flops= parse_double(action[2]);
215   int rank = smpi_process_index();
216   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
217   extra->type=TRACING_COMPUTING;
218   extra->comp_size=flops;
219   TRACE_smpi_computing_in(rank, extra);
220
221   smpi_execute_flops(flops);
222
223   TRACE_smpi_computing_out(rank);
224   log_timed_action (action, clock);
225 }
226
227 static void action_send(const char *const *action)
228 {
229   CHECK_ACTION_PARAMS(action, 2, 1)
230   int to = atoi(action[2]);
231   double size=parse_double(action[3]);
232   double clock = smpi_process_simulated_elapsed();
233
234   if(action[4])
235     MPI_CURRENT_TYPE=decode_datatype(action[4]);
236   else
237     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
238
239   int rank = smpi_process_index();
240
241   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
242   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
243   extra->type = TRACING_SEND;
244   extra->send_size = size;
245   extra->src = rank;
246   extra->dst = dst_traced;
247   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
248   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
249   if (!TRACE_smpi_view_internals()) {
250     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
251   }
252
253   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
254
255   log_timed_action (action, clock);
256
257   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
258 }
259
260 static void action_Isend(const char *const *action)
261 {
262   CHECK_ACTION_PARAMS(action, 2, 1)
263   int to = atoi(action[2]);
264   double size=parse_double(action[3]);
265   double clock = smpi_process_simulated_elapsed();
266   MPI_Request request;
267
268   if(action[4]) 
269     MPI_CURRENT_TYPE=decode_datatype(action[4]);
270   else 
271     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
272
273   int rank = smpi_process_index();
274   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
275   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
276   extra->type = TRACING_ISEND;
277   extra->send_size = size;
278   extra->src = rank;
279   extra->dst = dst_traced;
280   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
281   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
282   if (!TRACE_smpi_view_internals()) {
283     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
284   }
285
286   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
287
288   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
289   request->send = 1;
290
291   xbt_dynar_push(get_reqq_self(),&request);
292
293   log_timed_action (action, clock);
294 }
295
296 static void action_recv(const char *const *action) {
297   CHECK_ACTION_PARAMS(action, 2, 1)
298   int from = atoi(action[2]);
299   double size=parse_double(action[3]);
300   double clock = smpi_process_simulated_elapsed();
301   MPI_Status status;
302
303   if(action[4]) 
304     MPI_CURRENT_TYPE=decode_datatype(action[4]);
305   else 
306     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
307
308   int rank = smpi_process_index();
309   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
310
311   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
312   extra->type = TRACING_RECV;
313   extra->send_size = size;
314   extra->src = src_traced;
315   extra->dst = rank;
316   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
317   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
318
319   //unknow size from the receiver pov
320   if(size<=0.0){
321       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
322       size=status.count;
323   }
324
325   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
326
327   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
328   if (!TRACE_smpi_view_internals()) {
329     TRACE_smpi_recv(rank, src_traced, rank);
330   }
331
332   log_timed_action (action, clock);
333 }
334
335 static void action_Irecv(const char *const *action)
336 {
337   CHECK_ACTION_PARAMS(action, 2, 1)
338   int from = atoi(action[2]);
339   double size=parse_double(action[3]);
340   double clock = smpi_process_simulated_elapsed();
341   MPI_Request request;
342
343   if(action[4]) 
344     MPI_CURRENT_TYPE=decode_datatype(action[4]);
345   else 
346     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
347
348   int rank = smpi_process_index();
349   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
350   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
351   extra->type = TRACING_IRECV;
352   extra->send_size = size;
353   extra->src = src_traced;
354   extra->dst = rank;
355   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
356   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
357   MPI_Status status;
358   //unknow size from the receiver pov
359   if(size<=0.0){
360       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
361       size=status.count;
362   }
363
364   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
365
366   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
367   request->recv = 1;
368   xbt_dynar_push(get_reqq_self(),&request);
369
370   log_timed_action (action, clock);
371 }
372
373 static void action_test(const char *const *action){
374   CHECK_ACTION_PARAMS(action, 0, 0)
375   double clock = smpi_process_simulated_elapsed();
376   MPI_Request request;
377   MPI_Status status;
378   int flag = true;
379
380   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
381   //if request is null here, this may mean that a previous test has succeeded 
382   //Different times in traced application and replayed version may lead to this 
383   //In this case, ignore the extra calls.
384   if(request!=NULL){
385     int rank = smpi_process_index();
386     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
387     extra->type=TRACING_TEST;
388     TRACE_smpi_testing_in(rank, extra);
389
390     flag = smpi_mpi_test(&request, &status);
391
392     XBT_DEBUG("MPI_Test result: %d", flag);
393     /* push back request in dynar to be caught by a subsequent wait. if the test did succeed, the request is now NULL.*/
394     xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
395
396     TRACE_smpi_testing_out(rank);
397   }
398   log_timed_action (action, clock);
399 }
400
401 static void action_wait(const char *const *action){
402   CHECK_ACTION_PARAMS(action, 0, 0)
403   double clock = smpi_process_simulated_elapsed();
404   MPI_Request request;
405   MPI_Status status;
406
407   xbt_assert(xbt_dynar_length(get_reqq_self()),
408       "action wait not preceded by any irecv or isend: %s",
409       xbt_str_join_array(action," "));
410   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
411
412   if (request==NULL){
413     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
414     return;
415   }
416
417   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
418
419   MPI_Group group = smpi_comm_group(request->comm);
420   int src_traced = smpi_group_rank(group, request->src);
421   int dst_traced = smpi_group_rank(group, request->dst);
422   int is_wait_for_receive = request->recv;
423   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
424   extra->type = TRACING_WAIT;
425   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
426
427   smpi_mpi_wait(&request, &status);
428
429   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
430   if (is_wait_for_receive)
431     TRACE_smpi_recv(rank, src_traced, dst_traced);
432   log_timed_action (action, clock);
433 }
434
435 static void action_waitall(const char *const *action){
436   CHECK_ACTION_PARAMS(action, 0, 0)
437   double clock = smpi_process_simulated_elapsed();
438   int count_requests=0;
439   unsigned int i=0;
440
441   count_requests=xbt_dynar_length(get_reqq_self());
442
443   if (count_requests>0) {
444     MPI_Request requests[count_requests];
445     MPI_Status status[count_requests];
446
447     /*  The reqq is an array of dynars. Its index corresponds to the rank.
448      Thus each rank saves its own requests to the array request. */
449     xbt_dynar_foreach(get_reqq_self(),i,requests[i]); 
450
451    //save information from requests
452    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
453    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
454    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
455    for (i = 0; static_cast<int>(i) < count_requests; i++) {
456     if(requests[i]){
457       int *asrc = xbt_new(int, 1);
458       int *adst = xbt_new(int, 1);
459       int *arecv = xbt_new(int, 1);
460       *asrc = requests[i]->src;
461       *adst = requests[i]->dst;
462       *arecv = requests[i]->recv;
463       xbt_dynar_insert_at(srcs, i, asrc);
464       xbt_dynar_insert_at(dsts, i, adst);
465       xbt_dynar_insert_at(recvs, i, arecv);
466       xbt_free(asrc);
467       xbt_free(adst);
468       xbt_free(arecv);
469     }else {
470       int *t = xbt_new(int, 1);
471       xbt_dynar_insert_at(srcs, i, t);
472       xbt_dynar_insert_at(dsts, i, t);
473       xbt_dynar_insert_at(recvs, i, t);
474       xbt_free(t);
475     }
476    }
477    int rank_traced = smpi_process_index();
478    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
479    extra->type = TRACING_WAITALL;
480    extra->send_size=count_requests;
481    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
482
483    smpi_mpi_waitall(count_requests, requests, status);
484
485    for (i = 0; static_cast<int>(i) < count_requests; i++) {
486     int src_traced, dst_traced, is_wait_for_receive;
487     xbt_dynar_get_cpy(srcs, i, &src_traced);
488     xbt_dynar_get_cpy(dsts, i, &dst_traced);
489     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
490     if (is_wait_for_receive) {
491       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
492     }
493    }
494    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
495    //clean-up of dynars
496    xbt_dynar_free(&srcs);
497    xbt_dynar_free(&dsts);
498    xbt_dynar_free(&recvs);
499    set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
500   }
501   log_timed_action (action, clock);
502 }
503
504 static void action_barrier(const char *const *action){
505   double clock = smpi_process_simulated_elapsed();
506   int rank = smpi_process_index();
507   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508   extra->type = TRACING_BARRIER;
509   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
510
511   mpi_coll_barrier_fun(MPI_COMM_WORLD);
512
513   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
514   log_timed_action (action, clock);
515 }
516
517 static void action_bcast(const char *const *action)
518 {
519   CHECK_ACTION_PARAMS(action, 1, 2)
520   double size = parse_double(action[2]);
521   double clock = smpi_process_simulated_elapsed();
522   int root=0;
523   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
524   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
525
526   if(action[3]) {
527     root= atoi(action[3]);
528     if(action[4]) {
529       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
530     }
531   }
532
533   int rank = smpi_process_index();
534   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
535
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_BCAST;
538   extra->send_size = size;
539   extra->root = root_traced;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
542   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
543
544   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
545
546   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
547   log_timed_action (action, clock);
548 }
549
550 static void action_reduce(const char *const *action)
551 {
552   CHECK_ACTION_PARAMS(action, 2, 2)
553   double comm_size = parse_double(action[2]);
554   double comp_size = parse_double(action[3]);
555   double clock = smpi_process_simulated_elapsed();
556   int root=0;
557   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
558
559   if(action[4]) {
560     root= atoi(action[4]);
561     if(action[5]) {
562       MPI_CURRENT_TYPE=decode_datatype(action[5]);
563     }
564   }
565
566   int rank = smpi_process_index();
567   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
568   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569   extra->type = TRACING_REDUCE;
570   extra->send_size = comm_size;
571   extra->comp_size = comp_size;
572   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
573   extra->root = root_traced;
574
575   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
576
577   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
578   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
580   smpi_execute_flops(comp_size);
581
582   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
583   log_timed_action (action, clock);
584 }
585
586 static void action_allReduce(const char *const *action) {
587   CHECK_ACTION_PARAMS(action, 2, 1)
588   double comm_size = parse_double(action[2]);
589   double comp_size = parse_double(action[3]);
590
591   if(action[4])
592     MPI_CURRENT_TYPE=decode_datatype(action[4]);
593   else
594     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
595
596   double clock = smpi_process_simulated_elapsed();
597   int rank = smpi_process_index();
598   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
599   extra->type = TRACING_ALLREDUCE;
600   extra->send_size = comm_size;
601   extra->comp_size = comp_size;
602   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
603   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
604
605   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
606   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
608   smpi_execute_flops(comp_size);
609
610   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
611   log_timed_action (action, clock);
612 }
613
614 static void action_allToAll(const char *const *action) {
615   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
616                                      //two optional (corresponding datatypes)
617   double clock = smpi_process_simulated_elapsed();
618   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
619   int send_size = parse_double(action[2]);
620   int recv_size = parse_double(action[3]);
621   MPI_Datatype MPI_CURRENT_TYPE2;
622
623   if(action[4] && action[5]) {
624     MPI_CURRENT_TYPE=decode_datatype(action[4]);
625     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
626   }
627   else{
628     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
629     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
630   }
631
632   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
633   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
634
635   int rank = smpi_process_index();
636   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
637   extra->type = TRACING_ALLTOALL;
638   extra->send_size = send_size;
639   extra->recv_size = recv_size;
640   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
641   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
642
643   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
644
645   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
646
647   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648   log_timed_action (action, clock);
649 }
650
651 static void action_gather(const char *const *action) {
652   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
653  0 gather 68 68 0 0 0
654
655   where: 
656   1) 68 is the sendcounts
657   2) 68 is the recvcounts
658   3) 0 is the root node
659   4) 0 is the send datatype id, see decode_datatype()
660   5) 0 is the recv datatype id, see decode_datatype()
661   */
662   CHECK_ACTION_PARAMS(action, 2, 3)
663   double clock = smpi_process_simulated_elapsed();
664   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
665   int send_size = parse_double(action[2]);
666   int recv_size = parse_double(action[3]);
667   MPI_Datatype MPI_CURRENT_TYPE2;
668   if(action[4] && action[5]) {
669     MPI_CURRENT_TYPE=decode_datatype(action[5]);
670     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
671   } else {
672     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
673     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
674   }
675   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
676   void *recv = NULL;
677   int root=0;
678   if(action[4])
679     root=atoi(action[4]);
680   int rank = smpi_comm_rank(MPI_COMM_WORLD);
681
682   if(rank==root)
683     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
684
685   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
686   extra->type = TRACING_GATHER;
687   extra->send_size = send_size;
688   extra->recv_size = recv_size;
689   extra->root = root;
690   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
691   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
692
693   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
694
695   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
696
697   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
698   log_timed_action (action, clock);
699 }
700
701 static void action_gatherv(const char *const *action) {
702   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
703  0 gather 68 68 10 10 10 0 0 0
704
705   where:
706   1) 68 is the sendcount
707   2) 68 10 10 10 is the recvcounts
708   3) 0 is the root node
709   4) 0 is the send datatype id, see decode_datatype()
710   5) 0 is the recv datatype id, see decode_datatype()
711   */
712
713   double clock = smpi_process_simulated_elapsed();
714   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
715   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
716   int send_size = parse_double(action[2]);
717   int *disps = xbt_new0(int, comm_size);
718   int *recvcounts = xbt_new0(int, comm_size);
719   int i=0,recv_sum=0;
720
721   MPI_Datatype MPI_CURRENT_TYPE2;
722   if(action[4+comm_size] && action[5+comm_size]) {
723     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
724     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
725   } else {
726     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
727     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
728   }
729   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
730   void *recv = NULL;
731   for(i=0;i<comm_size;i++) {
732     recvcounts[i] = atoi(action[i+3]);
733     recv_sum=recv_sum+recvcounts[i];
734     disps[i] = 0;
735   }
736
737   int root=atoi(action[3+comm_size]);
738   int rank = smpi_comm_rank(MPI_COMM_WORLD);
739
740   if(rank==root)
741     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
742
743   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
744   extra->type = TRACING_GATHERV;
745   extra->send_size = send_size;
746   extra->recvcounts= xbt_new(int,comm_size);
747   for(i=0; i< comm_size; i++)//copy data to avoid bad free
748     extra->recvcounts[i] = recvcounts[i];
749   extra->root = root;
750   extra->num_processes = comm_size;
751   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
752   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
753
754   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
755
756   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
757
758   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
759   log_timed_action (action, clock);
760   xbt_free(recvcounts);
761   xbt_free(disps);
762 }
763
764 static void action_reducescatter(const char *const *action) {
765  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
766 0 reduceScatter 275427 275427 275427 204020 11346849 0
767
768   where: 
769   1) The first four values after the name of the action declare the recvcounts array
770   2) The value 11346849 is the amount of instructions
771   3) The last value corresponds to the datatype, see decode_datatype().
772
773   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
774   double clock = smpi_process_simulated_elapsed();
775   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
776   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
777   int comp_size = parse_double(action[2+comm_size]);
778   int *recvcounts = xbt_new0(int, comm_size);  
779   int *disps = xbt_new0(int, comm_size);  
780   int i=0;
781   int rank = smpi_process_index();
782   int size = 0;
783   if(action[3+comm_size])
784     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
785   else
786     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
787
788   for(i=0;i<comm_size;i++) {
789     recvcounts[i] = atoi(action[i+2]);
790     disps[i] = 0;
791     size+=recvcounts[i];
792   }
793
794   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
795   extra->type = TRACING_REDUCE_SCATTER;
796   extra->send_size = 0;
797   extra->recvcounts= xbt_new(int, comm_size);
798   for(i=0; i< comm_size; i++)//copy data to avoid bad free
799     extra->recvcounts[i] = recvcounts[i];
800   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
801   extra->comp_size = comp_size;
802   extra->num_processes = comm_size;
803
804   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
805
806   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
807   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
808    
809    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
810    smpi_execute_flops(comp_size);
811
812   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
813   xbt_free(recvcounts);
814   xbt_free(disps);
815   log_timed_action (action, clock);
816 }
817
818 static void action_allgather(const char *const *action) {
819   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
820   0 allGather 275427 275427
821
822   where: 
823   1) 275427 is the sendcount
824   2) 275427 is the recvcount
825   3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().  */
826   double clock = smpi_process_simulated_elapsed();
827
828   CHECK_ACTION_PARAMS(action, 2, 2)
829   int sendcount=atoi(action[2]); 
830   int recvcount=atoi(action[3]); 
831
832   MPI_Datatype MPI_CURRENT_TYPE2;
833
834   if(action[4] && action[5]) {
835     MPI_CURRENT_TYPE = decode_datatype(action[4]);
836     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
837   } else {
838     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
839     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
840   }
841   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
842   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
843
844   int rank = smpi_process_index();
845   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
846   extra->type = TRACING_ALLGATHER;
847   extra->send_size = sendcount;
848   extra->recv_size= recvcount;
849   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
850   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
851   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
852
853   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
854
855   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
856
857   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
858   log_timed_action (action, clock);
859 }
860
861 static void action_allgatherv(const char *const *action) {
862   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
863 0 allGatherV 275427 275427 275427 275427 204020
864
865   where: 
866   1) 275427 is the sendcount
867   2) The next four elements declare the recvcounts array
868   3) No more values mean that the datatype for sent and receive buffer
869   is the default one, see decode_datatype(). */
870   double clock = smpi_process_simulated_elapsed();
871
872   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
873   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
874   int i=0;
875   int sendcount=atoi(action[2]);
876   int *recvcounts = xbt_new0(int, comm_size);  
877   int *disps = xbt_new0(int, comm_size);  
878   int recv_sum=0;  
879   MPI_Datatype MPI_CURRENT_TYPE2;
880
881   if(action[3+comm_size] && action[4+comm_size]) {
882     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
883     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
884   } else {
885     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
886     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
887   }
888   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
889
890   for(i=0;i<comm_size;i++) {
891     recvcounts[i] = atoi(action[i+3]);
892     recv_sum=recv_sum+recvcounts[i];
893   }
894   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
895
896   int rank = smpi_process_index();
897   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
898   extra->type = TRACING_ALLGATHERV;
899   extra->send_size = sendcount;
900   extra->recvcounts= xbt_new(int, comm_size);
901   for(i=0; i< comm_size; i++)//copy data to avoid bad free
902     extra->recvcounts[i] = recvcounts[i];
903   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
904   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
905   extra->num_processes = comm_size;
906
907   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
908
909   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
910                           MPI_COMM_WORLD);
911
912   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
913   log_timed_action (action, clock);
914   xbt_free(recvcounts);
915   xbt_free(disps);
916 }
917
918 static void action_allToAllv(const char *const *action) {
919   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
920   0 allToAllV 100 1 7 10 12 100 1 70 10 5
921
922   where: 
923   1) 100 is the size of the send buffer *sizeof(int),
924   2) 1 7 10 12 is the sendcounts array
925   3) 100*sizeof(int) is the size of the receiver buffer
926   4)  1 70 10 5 is the recvcounts array */
927   double clock = smpi_process_simulated_elapsed();
928
929   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
930   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
931   int send_buf_size=0,recv_buf_size=0,i=0;
932   int *sendcounts = xbt_new0(int, comm_size);  
933   int *recvcounts = xbt_new0(int, comm_size);  
934   int *senddisps = xbt_new0(int, comm_size);  
935   int *recvdisps = xbt_new0(int, comm_size);  
936
937   MPI_Datatype MPI_CURRENT_TYPE2;
938
939   send_buf_size=parse_double(action[2]);
940   recv_buf_size=parse_double(action[3+comm_size]);
941   if(action[4+2*comm_size] && action[5+2*comm_size]) {
942     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
943     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
944   }
945   else{
946     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
947     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
948   }
949
950   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
951   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
952
953   for(i=0;i<comm_size;i++) {
954     sendcounts[i] = atoi(action[i+3]);
955     recvcounts[i] = atoi(action[i+4+comm_size]);
956   }
957
958   int rank = smpi_process_index();
959   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
960   extra->type = TRACING_ALLTOALLV;
961   extra->recvcounts= xbt_new(int, comm_size);
962   extra->sendcounts= xbt_new(int, comm_size);
963   extra->num_processes = comm_size;
964
965   for(i=0; i< comm_size; i++){//copy data to avoid bad free
966     extra->send_size += sendcounts[i];
967     extra->sendcounts[i] = sendcounts[i];
968     extra->recv_size += recvcounts[i];
969     extra->recvcounts[i] = recvcounts[i];
970   }
971   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
972   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
973
974   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
975
976   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
977                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
978
979   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
980   log_timed_action (action, clock);
981   xbt_free(sendcounts);
982   xbt_free(recvcounts);
983   xbt_free(senddisps);
984   xbt_free(recvdisps);
985 }
986
987 void smpi_replay_run(int *argc, char***argv){
988   /* First initializes everything */
989   smpi_process_init(argc, argv);
990   smpi_process_mark_as_initialized();
991   smpi_process_set_replaying(true);
992
993   int rank = smpi_process_index();
994   TRACE_smpi_init(rank);
995   TRACE_smpi_computing_init(rank);
996   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
997   extra->type = TRACING_INIT;
998   char *operation =bprintf("%s_init",__FUNCTION__);
999   TRACE_smpi_collective_in(rank, -1, operation, extra);
1000   TRACE_smpi_collective_out(rank, -1, operation);
1001   xbt_free(operation);
1002
1003   if (_xbt_replay_action_init()==0) {
1004     xbt_replay_action_register("init",       action_init);
1005     xbt_replay_action_register("finalize",   action_finalize);
1006     xbt_replay_action_register("comm_size",  action_comm_size);
1007     xbt_replay_action_register("comm_split", action_comm_split);
1008     xbt_replay_action_register("comm_dup",   action_comm_dup);
1009     xbt_replay_action_register("send",       action_send);
1010     xbt_replay_action_register("Isend",      action_Isend);
1011     xbt_replay_action_register("recv",       action_recv);
1012     xbt_replay_action_register("Irecv",      action_Irecv);
1013     xbt_replay_action_register("test",       action_test);
1014     xbt_replay_action_register("wait",       action_wait);
1015     xbt_replay_action_register("waitAll",    action_waitall);
1016     xbt_replay_action_register("barrier",    action_barrier);
1017     xbt_replay_action_register("bcast",      action_bcast);
1018     xbt_replay_action_register("reduce",     action_reduce);
1019     xbt_replay_action_register("allReduce",  action_allReduce);
1020     xbt_replay_action_register("allToAll",   action_allToAll);
1021     xbt_replay_action_register("allToAllV",  action_allToAllv);
1022     xbt_replay_action_register("gather",  action_gather);
1023     xbt_replay_action_register("gatherV",  action_gatherv);
1024     xbt_replay_action_register("allGather",  action_allgather);
1025     xbt_replay_action_register("allGatherV",  action_allgatherv);
1026     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1027     xbt_replay_action_register("compute",    action_compute);
1028   }
1029
1030   //if we have a delayed start, sleep here.
1031   if(*argc>2){
1032     char *endptr;
1033     double value = strtod((*argv)[2], &endptr);
1034     if (*endptr != '\0')
1035       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1036     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1037     smpi_execute_flops(value);
1038   } else {
1039     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1040     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
1041     smpi_execute_flops(0.0);
1042   }
1043
1044   /* Actually run the replay */
1045   xbt_replay_action_runner(*argc, *argv);
1046
1047   /* and now, finalize everything */
1048   double sim_time= 1.;
1049   /* One active process will stop. Decrease the counter*/
1050   XBT_DEBUG("There are %lu elements in reqq[*]", xbt_dynar_length(get_reqq_self()));
1051   if (xbt_dynar_is_empty(get_reqq_self())==0){
1052     int count_requests=xbt_dynar_length(get_reqq_self());
1053     MPI_Request requests[count_requests];
1054     MPI_Status status[count_requests];
1055     unsigned int i;
1056
1057     xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1058     smpi_mpi_waitall(count_requests, requests, status);
1059     active_processes--;
1060   } else {
1061     active_processes--;
1062   }
1063
1064   if(active_processes==0){
1065     /* Last process alive speaking */
1066     /* end the simulated timer */
1067     sim_time = smpi_process_simulated_elapsed();
1068     XBT_INFO("Simulation time %f", sim_time);
1069     _xbt_replay_action_exit();
1070     xbt_free(sendbuffer);
1071     xbt_free(recvbuffer);
1072     xbt_dict_free(&reqq); //not need, data have been freed ???
1073     reqq = NULL;
1074   }
1075
1076   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1077   extra_fin->type = TRACING_FINALIZE;
1078   operation =bprintf("%s_finalize",__FUNCTION__);
1079   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1080
1081   smpi_process_finalize();
1082
1083   TRACE_smpi_collective_out(rank, -1, operation);
1084   TRACE_smpi_finalize(smpi_process_index());
1085   smpi_process_destroy();
1086   xbt_free(operation);
1087 }