Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
set extra data to 0 at allocation time to avoid bad surprises
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80
81 const char* encode_datatype(MPI_Datatype datatype)
82 {
83
84   //default type for output is set to MPI_BYTE
85   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
86   if (datatype==MPI_BYTE){
87       return "";
88   }
89   if(datatype==MPI_DOUBLE)
90       return "0";
91   if(datatype==MPI_INT)
92       return "1";
93   if(datatype==MPI_CHAR)
94       return "2";
95   if(datatype==MPI_SHORT)
96       return "3";
97   if(datatype==MPI_LONG)
98     return "4";
99   if(datatype==MPI_FLOAT)
100       return "5";
101
102   // default - not implemented.
103   // do not warn here as we pass in this function even for other trace formats
104   return "-1";
105 }
106
107 static void action_init(const char *const *action)
108 {
109   int i;
110   XBT_DEBUG("Initialize the counters");
111   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
112   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
113
114   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
115   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
116
117   smpi_process_set_user_data((void*) globals);
118
119   /* start a simulated timer */
120   smpi_process_simulated_start();
121   /*initialize the number of active processes */
122   active_processes = smpi_process_count();
123
124   if (!reqq) {
125     reqq=xbt_new0(xbt_dynar_t,active_processes);
126
127     for(i=0;i<active_processes;i++){
128       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
129     }
130   }
131 }
132
133 static void action_finalize(const char *const *action)
134 {
135   smpi_replay_globals_t globals =
136       (smpi_replay_globals_t) smpi_process_get_user_data();
137   if (globals){
138     XBT_DEBUG("There are %lu irecvs in the dynar",
139          xbt_dynar_length(globals->irecvs));
140     xbt_dynar_free_container(&(globals->irecvs));
141   }
142   free(globals);
143 }
144
145 static void action_comm_size(const char *const *action)
146 {
147   double clock = smpi_process_simulated_elapsed();
148
149   communicator_size = parse_double(action[2]);
150   log_timed_action (action, clock);
151 }
152
153 static void action_comm_split(const char *const *action)
154 {
155   double clock = smpi_process_simulated_elapsed();
156
157   log_timed_action (action, clock);
158 }
159
160 static void action_comm_dup(const char *const *action)
161 {
162   double clock = smpi_process_simulated_elapsed();
163
164   log_timed_action (action, clock);
165 }
166
167 static void action_compute(const char *const *action)
168 {
169   double clock = smpi_process_simulated_elapsed();
170   double flops= parse_double(action[2]);
171 #ifdef HAVE_TRACING
172   int rank = smpi_comm_rank(MPI_COMM_WORLD);
173   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
174   extra->type=TRACING_COMPUTING;
175   extra->comp_size=flops;
176   TRACE_smpi_computing_in(rank, extra);
177 #endif
178   smpi_execute_flops(flops);
179 #ifdef HAVE_TRACING
180   TRACE_smpi_computing_out(rank);
181 #endif
182
183   log_timed_action (action, clock);
184 }
185
186 static void action_send(const char *const *action)
187 {
188   int to = atoi(action[2]);
189   double size=parse_double(action[3]);
190   double clock = smpi_process_simulated_elapsed();
191
192   if(action[4]) {
193     MPI_CURRENT_TYPE=decode_datatype(action[4]);
194   } else {
195     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
196   }
197
198 #ifdef HAVE_TRACING
199   int rank = smpi_comm_rank(MPI_COMM_WORLD);
200
201   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
202   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
203   extra->type = TRACING_SEND;
204   extra->send_size = size;
205   extra->src = rank;
206   extra->dst = dst_traced;
207   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
208   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
209   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
210 #endif
211
212   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
213
214   log_timed_action (action, clock);
215
216   #ifdef HAVE_TRACING
217   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
218 #endif
219
220 }
221
222 static void action_Isend(const char *const *action)
223 {
224   int to = atoi(action[2]);
225   double size=parse_double(action[3]);
226   double clock = smpi_process_simulated_elapsed();
227   MPI_Request request;
228
229   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
230   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231
232 #ifdef HAVE_TRACING
233   int rank = smpi_comm_rank(MPI_COMM_WORLD);
234   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
235   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
236   extra->type = TRACING_ISEND;
237   extra->send_size = size;
238   extra->src = rank;
239   extra->dst = dst_traced;
240   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
241   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
242   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243 #endif
244
245   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
246
247 #ifdef HAVE_TRACING
248   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
249   request->send = 1;
250 #endif
251
252   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
253
254   log_timed_action (action, clock);
255 }
256
257 static void action_recv(const char *const *action) {
258   int from = atoi(action[2]);
259   double size=parse_double(action[3]);
260   double clock = smpi_process_simulated_elapsed();
261   MPI_Status status;
262
263   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
264   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265
266 #ifdef HAVE_TRACING
267   int rank = smpi_comm_rank(MPI_COMM_WORLD);
268   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
269
270   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
271   extra->type = TRACING_RECV;
272   extra->send_size = size;
273   extra->src = src_traced;
274   extra->dst = rank;
275   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
276   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
277 #endif
278
279   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
280
281 #ifdef HAVE_TRACING
282   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
283   TRACE_smpi_recv(rank, src_traced, rank);
284 #endif
285
286   log_timed_action (action, clock);
287 }
288
289 static void action_Irecv(const char *const *action)
290 {
291   int from = atoi(action[2]);
292   double size=parse_double(action[3]);
293   double clock = smpi_process_simulated_elapsed();
294   MPI_Request request;
295
296   smpi_replay_globals_t globals =
297      (smpi_replay_globals_t) smpi_process_get_user_data();
298
299   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
300   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
301
302 #ifdef HAVE_TRACING
303   int rank = smpi_comm_rank(MPI_COMM_WORLD);
304   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
305   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
306   extra->type = TRACING_IRECV;
307   extra->send_size = size;
308   extra->src = src_traced;
309   extra->dst = rank;
310   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
311   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
312 #endif
313
314   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
315
316 #ifdef HAVE_TRACING
317   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
318   request->recv = 1;
319 #endif
320   xbt_dynar_push(globals->irecvs,&request);
321   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
322
323   log_timed_action (action, clock);
324 }
325
326 static void action_wait(const char *const *action){
327   double clock = smpi_process_simulated_elapsed();
328   MPI_Request request;
329   MPI_Status status;
330   smpi_replay_globals_t globals =
331       (smpi_replay_globals_t) smpi_process_get_user_data();
332
333   xbt_assert(xbt_dynar_length(globals->irecvs),
334       "action wait not preceded by any irecv: %s",
335       xbt_str_join_array(action," "));
336   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
337 #ifdef HAVE_TRACING
338   int rank = request && request->comm != MPI_COMM_NULL
339       ? smpi_comm_rank(request->comm)
340       : -1;
341
342   MPI_Group group = smpi_comm_group(request->comm);
343   int src_traced = smpi_group_rank(group, request->src);
344   int dst_traced = smpi_group_rank(group, request->dst);
345   int is_wait_for_receive = request->recv;
346   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
347   extra->type = TRACING_WAIT;
348   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
349 #endif
350   smpi_mpi_wait(&request, &status);
351 #ifdef HAVE_TRACING
352   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
353   if (is_wait_for_receive) {
354     TRACE_smpi_recv(rank, src_traced, dst_traced);
355   }
356 #endif
357
358   log_timed_action (action, clock);
359 }
360
361 static void action_waitall(const char *const *action){
362   double clock = smpi_process_simulated_elapsed();
363   int count_requests=0;
364   unsigned int i=0;
365
366   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
367
368   if (count_requests>0) {
369     MPI_Request requests[count_requests];
370     MPI_Status status[count_requests];
371
372     /*  The reqq is an array of dynars. Its index corresponds to the rank.
373      Thus each rank saves its own requests to the array request. */
374     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
375
376   #ifdef HAVE_TRACING
377    //save information from requests
378
379    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
380    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
381    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
382    for (i = 0; i < count_requests; i++) {
383     if(requests[i]){
384       int *asrc = xbt_new(int, 1);
385       int *adst = xbt_new(int, 1);
386       int *arecv = xbt_new(int, 1);
387       *asrc = requests[i]->src;
388       *adst = requests[i]->dst;
389       *arecv = requests[i]->recv;
390       xbt_dynar_insert_at(srcs, i, asrc);
391       xbt_dynar_insert_at(dsts, i, adst);
392       xbt_dynar_insert_at(recvs, i, arecv);
393       xbt_free(asrc);
394       xbt_free(adst);
395       xbt_free(arecv);
396     }else {
397       int *t = xbt_new(int, 1);
398       xbt_dynar_insert_at(srcs, i, t);
399       xbt_dynar_insert_at(dsts, i, t);
400       xbt_dynar_insert_at(recvs, i, t);
401       xbt_free(t);
402     }
403    }
404    int rank_traced = smpi_process_index();
405    instr_extra_data extra = xbt_new(s_instr_extra_data_t,1);
406    extra->type = TRACING_WAITALL;
407    extra->send_size=count_requests;
408    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
409  #endif
410
411     smpi_mpi_waitall(count_requests, requests, status);
412
413   #ifdef HAVE_TRACING
414    for (i = 0; i < count_requests; i++) {
415     int src_traced, dst_traced, is_wait_for_receive;
416     xbt_dynar_get_cpy(srcs, i, &src_traced);
417     xbt_dynar_get_cpy(dsts, i, &dst_traced);
418     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
419     if (is_wait_for_receive) {
420       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
421     }
422    }
423    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
424    //clean-up of dynars
425    xbt_dynar_free(&srcs);
426    xbt_dynar_free(&dsts);
427    xbt_dynar_free(&recvs);
428   #endif
429
430    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
431   }
432   log_timed_action (action, clock);
433 }
434
435 static void action_barrier(const char *const *action){
436   double clock = smpi_process_simulated_elapsed();
437 #ifdef HAVE_TRACING
438   int rank = smpi_comm_rank(MPI_COMM_WORLD);
439   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
440   extra->type = TRACING_BARRIER;
441   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
442 #endif
443   smpi_mpi_barrier(MPI_COMM_WORLD);
444 #ifdef HAVE_TRACING
445   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
446 #endif
447
448   log_timed_action (action, clock);
449 }
450
451
452 static void action_bcast(const char *const *action)
453 {
454   double size = parse_double(action[2]);
455   double clock = smpi_process_simulated_elapsed();
456   int root=0;
457   /*
458    * Initialize MPI_CURRENT_TYPE in order to decrease
459    * the number of the checks
460    * */
461   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
462
463   if(action[3]) {
464     root= atoi(action[3]);
465     if(action[4]) {
466       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
467     }
468   }
469
470 #ifdef HAVE_TRACING
471   int rank = smpi_comm_rank(MPI_COMM_WORLD);
472   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
473
474   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475   extra->type = TRACING_BCAST;
476   extra->send_size = size;
477   extra->root = root_traced;
478   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
479   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480
481 #endif
482
483   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484 #ifdef HAVE_TRACING
485   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486 #endif
487
488   log_timed_action (action, clock);
489 }
490
491 static void action_reduce(const char *const *action)
492 {
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process_simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5]) {
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503     }
504   }
505
506 #ifdef HAVE_TRACING
507   int rank = smpi_comm_rank(MPI_COMM_WORLD);
508   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
509   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
510   extra->type = TRACING_REDUCE;
511   extra->send_size = comm_size;
512   extra->comp_size = comp_size;
513   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
514   extra->root = root_traced;
515
516   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
517 #endif
518    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519    smpi_execute_flops(comp_size);
520 #ifdef HAVE_TRACING
521   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522 #endif
523
524   log_timed_action (action, clock);
525 }
526
527 static void action_allReduce(const char *const *action) {
528   double comm_size = parse_double(action[2]);
529   double comp_size = parse_double(action[3]);
530
531   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533
534   double clock = smpi_process_simulated_elapsed();
535 #ifdef HAVE_TRACING
536   int rank = smpi_comm_rank(MPI_COMM_WORLD);
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
542
543   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
544 #endif
545   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
546   smpi_execute_flops(comp_size);
547   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
548 #ifdef HAVE_TRACING
549   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550 #endif
551
552   log_timed_action (action, clock);
553 }
554
555 static void action_allToAll(const char *const *action) {
556   double clock = smpi_process_simulated_elapsed();
557   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
558   int send_size = parse_double(action[2]);
559   int recv_size = parse_double(action[3]);
560   MPI_Datatype MPI_CURRENT_TYPE2;
561
562   if(action[4]) {
563     MPI_CURRENT_TYPE=decode_datatype(action[4]);
564     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
565   }
566   else {
567     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
569   }
570   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
571   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
572
573 #ifdef HAVE_TRACING
574   int rank = smpi_process_index();
575   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
576   extra->type = TRACING_ALLTOALL;
577   extra->send_size = send_size;
578   extra->recv_size = recv_size;
579   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
580   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
581
582   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
583 #endif
584
585   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
586
587 #ifdef HAVE_TRACING
588   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
589 #endif
590
591   log_timed_action (action, clock);
592   xbt_free(send);
593   xbt_free(recv);
594 }
595
596
597 static void action_gather(const char *const *action) {
598   /*
599  The structure of the gather action for the rank 0 (total 4 processes) 
600  is the following:   
601  0 gather 68 68 0 0 0
602
603   where: 
604   1) 68 is the sendcounts
605   2) 68 is the recvcounts
606   3) 0 is the root node
607   4) 0 is the send datatype id, see decode_datatype()
608   5) 0 is the recv datatype id, see decode_datatype()
609   */
610   double clock = smpi_process_simulated_elapsed();
611   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
612   int send_size = parse_double(action[2]);
613   int recv_size = parse_double(action[3]);
614   MPI_Datatype MPI_CURRENT_TYPE2;
615   if(action[5]) {
616     MPI_CURRENT_TYPE=decode_datatype(action[5]);
617     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
618   } else {
619     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
621   }
622   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
623   void *recv = calloc(recv_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
624
625   int root=atoi(action[4]);
626   int rank = smpi_process_index();
627
628   if(rank==root)
629     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
630
631 #ifdef HAVE_TRACING
632   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
633   extra->type = TRACING_GATHER;
634   extra->send_size = send_size;
635   extra->recv_size = recv_size;
636   extra->root = root;
637   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
638   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
639
640   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
641 #endif
642 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
643                 recv, recv_size, MPI_CURRENT_TYPE2,
644                 root, MPI_COMM_WORLD);
645
646 #ifdef HAVE_TRACING
647   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648 #endif
649
650   log_timed_action (action, clock);
651   xbt_free(send);
652   xbt_free(recv);
653 }
654
655
656 static void action_reducescatter(const char *const *action) {
657
658     /*
659  The structure of the reducescatter action for the rank 0 (total 4 processes) 
660  is the following:   
661 0 reduceScatter 275427 275427 275427 204020 11346849 0
662
663   where: 
664   1) The first four values after the name of the action declare the recvcounts array
665   2) The value 11346849 is the amount of instructions
666   3) The last value corresponds to the datatype, see decode_datatype().
667
668   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
669
670    */
671
672   double clock = smpi_process_simulated_elapsed();
673   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674   int comp_size = parse_double(action[2+comm_size]);
675   int *recvcounts = xbt_new0(int, comm_size);  
676   int *disps = xbt_new0(int, comm_size);  
677   int i=0,recv_sum=0;
678   int root=0;
679   int rank = smpi_process_index();
680
681   if(action[3+comm_size])
682     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
683   else
684     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
685
686   for(i=0;i<comm_size;i++) {
687     recvcounts[i] = atoi(action[i+2]);
688     recv_sum=recv_sum+recvcounts[i];
689     disps[i] = 0;
690   }
691
692 #ifdef HAVE_TRACING
693   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
694   extra->type = TRACING_REDUCE_SCATTER;
695   extra->send_size = 0;
696   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
697   for(i=0; i< comm_size; i++)//copy data to avoid bad free
698     extra->recvcounts[i] = recvcounts[i];
699   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
700   extra->comp_size = comp_size;
701   extra->num_processes = comm_size;
702
703
704   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
705 #endif
706    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
707        root, MPI_COMM_WORLD);
708    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
709                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
710    smpi_execute_flops(comp_size);
711
712
713 #ifdef HAVE_TRACING
714   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
715 #endif
716
717   log_timed_action (action, clock);
718 }
719
720
721 static void action_allgatherv(const char *const *action) {
722
723   /*
724  The structure of the allgatherv action for the rank 0 (total 4 processes) 
725  is the following:   
726 0 allGatherV 275427 275427 275427 275427 204020
727
728   where: 
729   1) 275427 is the sendcount
730   2) The next four elements declare the recvcounts array
731   3) No more values mean that the datatype for sent and receive buffer
732   is the default one, see decode_datatype().
733
734    */
735
736   double clock = smpi_process_simulated_elapsed();
737
738   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
739   int i=0;
740   int sendcount=atoi(action[2]);
741   int *recvcounts = xbt_new0(int, comm_size);  
742   int *disps = xbt_new0(int, comm_size);  
743   int recv_sum=0;  
744   MPI_Datatype MPI_CURRENT_TYPE2;
745
746   if(action[3+comm_size]) {
747     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
748     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
749   } else {
750     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
751     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
752   }
753   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
754
755   for(i=0;i<comm_size;i++) {
756     recvcounts[i] = atoi(action[i+3]);
757     recv_sum=recv_sum+recvcounts[i];
758   }
759   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
760
761 #ifdef HAVE_TRACING
762   int rank = smpi_process_index();
763   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
764   extra->type = TRACING_ALLGATHERV;
765   extra->send_size = sendcount;
766   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
767   for(i=0; i< comm_size; i++)//copy data to avoid bad free
768     extra->recvcounts[i] = recvcounts[i];
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
771   extra->num_processes = comm_size;
772
773   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
774 #endif
775
776 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
777
778 #ifdef HAVE_TRACING
779   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
780 #endif
781
782   log_timed_action (action, clock);
783   xbt_free(sendbuf);
784   xbt_free(recvbuf);
785   xbt_free(recvcounts);
786   xbt_free(disps);
787 }
788
789
790 static void action_allToAllv(const char *const *action) {
791   /*
792  The structure of the allToAllV action for the rank 0 (total 4 processes) 
793  is the following:   
794   0 allToAllV 100 1 7 10 12 100 1 70 10 5
795
796   where: 
797   1) 100 is the size of the send buffer *sizeof(int),
798   2) 1 7 10 12 is the sendcounts array
799   3) 100*sizeof(int) is the size of the receiver buffer
800   4)  1 70 10 5 is the recvcounts array
801
802    */
803
804
805   double clock = smpi_process_simulated_elapsed();
806
807   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
808   int send_buf_size=0,recv_buf_size=0,i=0;
809   int *sendcounts = xbt_new0(int, comm_size);  
810   int *recvcounts = xbt_new0(int, comm_size);  
811   int *senddisps = xbt_new0(int, comm_size);  
812   int *recvdisps = xbt_new0(int, comm_size);  
813
814   MPI_Datatype MPI_CURRENT_TYPE2;
815
816   send_buf_size=parse_double(action[2]);
817   recv_buf_size=parse_double(action[3+comm_size]);
818   if(action[4+2*comm_size]) {
819     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
820     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
821   }
822   else {
823       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
824       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
825   }
826
827   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
828   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
829
830   for(i=0;i<comm_size;i++) {
831     sendcounts[i] = atoi(action[i+3]);
832     recvcounts[i] = atoi(action[i+4+comm_size]);
833   }
834
835
836 #ifdef HAVE_TRACING
837   int rank = smpi_process_index();
838   int count=0;
839   for(i=0;i<comm_size;i++) count+=sendcounts[i];
840   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
841   extra->type = TRACING_ALLTOALLV;
842   extra->send_size = count;
843   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
844   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
845   extra->num_processes = comm_size;
846
847   for(i=0; i< comm_size; i++){//copy data to avoid bad free
848     extra->send_size += sendcounts[i];
849     extra->sendcounts[i] = sendcounts[i];
850     extra->recvcounts[i] = recvcounts[i];
851   }
852   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
853   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
854
855   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
856 #endif
857     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
858                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
859                                MPI_COMM_WORLD);
860 #ifdef HAVE_TRACING
861   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
862 #endif
863
864   log_timed_action (action, clock);
865   xbt_free(sendbuf);
866   xbt_free(recvbuf);
867   xbt_free(sendcounts);
868   xbt_free(recvcounts);
869   xbt_free(senddisps);
870   xbt_free(recvdisps);
871 }
872
873 void smpi_replay_init(int *argc, char***argv){
874   smpi_process_init(argc, argv);
875   smpi_process_mark_as_initialized();
876 #ifdef HAVE_TRACING
877   int rank = smpi_process_index();
878   TRACE_smpi_init(rank);
879   TRACE_smpi_computing_init(rank);
880   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
881   extra->type = TRACING_INIT;
882   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
883   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
884 #endif
885
886   if (!smpi_process_index()){
887     _xbt_replay_action_init();
888     xbt_replay_action_register("init",       action_init);
889     xbt_replay_action_register("finalize",   action_finalize);
890     xbt_replay_action_register("comm_size",  action_comm_size);
891     xbt_replay_action_register("comm_split", action_comm_split);
892     xbt_replay_action_register("comm_dup",   action_comm_dup);
893     xbt_replay_action_register("send",       action_send);
894     xbt_replay_action_register("Isend",      action_Isend);
895     xbt_replay_action_register("recv",       action_recv);
896     xbt_replay_action_register("Irecv",      action_Irecv);
897     xbt_replay_action_register("wait",       action_wait);
898     xbt_replay_action_register("waitAll",    action_waitall);
899     xbt_replay_action_register("barrier",    action_barrier);
900     xbt_replay_action_register("bcast",      action_bcast);
901     xbt_replay_action_register("reduce",     action_reduce);
902     xbt_replay_action_register("allReduce",  action_allReduce);
903     xbt_replay_action_register("allToAll",   action_allToAll);
904     xbt_replay_action_register("allToAllV",  action_allToAllv);
905     xbt_replay_action_register("gather",  action_gather);
906     xbt_replay_action_register("allGatherV",  action_allgatherv);
907     xbt_replay_action_register("reduceScatter",  action_reducescatter);
908     xbt_replay_action_register("compute",    action_compute);
909   }
910
911   xbt_replay_action_runner(*argc, *argv);
912 }
913
914 int smpi_replay_finalize(){
915   double sim_time= 1.;
916   /* One active process will stop. Decrease the counter*/
917   active_processes--;
918   XBT_DEBUG("There are %lu elements in reqq[*]",
919             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
920   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
921   if(!active_processes){
922     /* Last process alive speaking */
923     /* end the simulated timer */
924     sim_time = smpi_process_simulated_elapsed();
925     XBT_INFO("Simulation time %g", sim_time);
926     _xbt_replay_action_exit();
927     xbt_free(reqq);
928     reqq = NULL;
929   }
930   smpi_mpi_barrier(MPI_COMM_WORLD);
931 #ifdef HAVE_TRACING
932   int rank = smpi_process_index();
933   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
934   extra->type = TRACING_FINALIZE;
935   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
936 #endif
937   smpi_process_finalize();
938 #ifdef HAVE_TRACING
939   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
940   TRACE_smpi_finalize(smpi_process_index());
941 #endif
942   smpi_process_destroy();
943   return MPI_SUCCESS;
944 }