Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e8f0ada9ad38942cb3faee288ca8bc5c0df0725a
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80
81 const char* encode_datatype(MPI_Datatype datatype)
82 {
83
84   //default type for output is set to MPI_BYTE
85   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
86   if (datatype==MPI_BYTE){
87       return "";
88   }
89   if(datatype==MPI_DOUBLE)
90       return "0";
91   if(datatype==MPI_INT)
92       return "1";
93   if(datatype==MPI_CHAR)
94       return "2";
95   if(datatype==MPI_SHORT)
96       return "3";
97   if(datatype==MPI_LONG)
98     return "4";
99   if(datatype==MPI_FLOAT)
100       return "5";
101
102   // default - not implemented.
103   // do not warn here as we pass in this function even for other trace formats
104   return "-1";
105 }
106
107 static void action_init(const char *const *action)
108 {
109   int i;
110   XBT_DEBUG("Initialize the counters");
111   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
112   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
113
114   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
115   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
116
117   smpi_process_set_user_data((void*) globals);
118
119   /* start a simulated timer */
120   smpi_process_simulated_start();
121   /*initialize the number of active processes */
122   active_processes = smpi_process_count();
123
124   if (!reqq) {
125     reqq=xbt_new0(xbt_dynar_t,active_processes);
126
127     for(i=0;i<active_processes;i++){
128       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
129     }
130   }
131 }
132
133 static void action_finalize(const char *const *action)
134 {
135   smpi_replay_globals_t globals =
136       (smpi_replay_globals_t) smpi_process_get_user_data();
137   if (globals){
138     XBT_DEBUG("There are %lu irecvs in the dynar",
139          xbt_dynar_length(globals->irecvs));
140     xbt_dynar_free_container(&(globals->irecvs));
141   }
142   free(globals);
143 }
144
145 static void action_comm_size(const char *const *action)
146 {
147   double clock = smpi_process_simulated_elapsed();
148
149   communicator_size = parse_double(action[2]);
150   log_timed_action (action, clock);
151 }
152
153 static void action_comm_split(const char *const *action)
154 {
155   double clock = smpi_process_simulated_elapsed();
156
157   log_timed_action (action, clock);
158 }
159
160 static void action_comm_dup(const char *const *action)
161 {
162   double clock = smpi_process_simulated_elapsed();
163
164   log_timed_action (action, clock);
165 }
166
167 static void action_compute(const char *const *action)
168 {
169   double clock = smpi_process_simulated_elapsed();
170   double flops= parse_double(action[2]);
171 #ifdef HAVE_TRACING
172   int rank = smpi_comm_rank(MPI_COMM_WORLD);
173   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
174   extra->type=TRACING_COMPUTING;
175   extra->comp_size=flops;
176   TRACE_smpi_computing_in(rank, extra);
177 #endif
178   smpi_execute_flops(flops);
179 #ifdef HAVE_TRACING
180   TRACE_smpi_computing_out(rank);
181 #endif
182
183   log_timed_action (action, clock);
184 }
185
186 static void action_send(const char *const *action)
187 {
188   int to = atoi(action[2]);
189   double size=parse_double(action[3]);
190   double clock = smpi_process_simulated_elapsed();
191
192   if(action[4]) {
193     MPI_CURRENT_TYPE=decode_datatype(action[4]);
194   } else {
195     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
196   }
197
198 #ifdef HAVE_TRACING
199   int rank = smpi_comm_rank(MPI_COMM_WORLD);
200
201   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
202   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
203   extra->type = TRACING_SEND;
204   extra->send_size = size;
205   extra->src = rank;
206   extra->dst = dst_traced;
207   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
208   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
209   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
210 #endif
211
212   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
213
214   log_timed_action (action, clock);
215
216   #ifdef HAVE_TRACING
217   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
218 #endif
219
220 }
221
222 static void action_Isend(const char *const *action)
223 {
224   int to = atoi(action[2]);
225   double size=parse_double(action[3]);
226   double clock = smpi_process_simulated_elapsed();
227   MPI_Request request;
228
229   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
230   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231
232 #ifdef HAVE_TRACING
233   int rank = smpi_comm_rank(MPI_COMM_WORLD);
234   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
235   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
236   extra->type = TRACING_ISEND;
237   extra->send_size = size;
238   extra->src = rank;
239   extra->dst = dst_traced;
240   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
241   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
242   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243 #endif
244
245   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
246
247 #ifdef HAVE_TRACING
248   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
249   request->send = 1;
250 #endif
251
252   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
253
254   log_timed_action (action, clock);
255 }
256
257 static void action_recv(const char *const *action) {
258   int from = atoi(action[2]);
259   double size=parse_double(action[3]);
260   double clock = smpi_process_simulated_elapsed();
261   MPI_Status status;
262
263   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
264   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265
266 #ifdef HAVE_TRACING
267   int rank = smpi_comm_rank(MPI_COMM_WORLD);
268   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
269
270   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
271   extra->type = TRACING_RECV;
272   extra->send_size = size;
273   extra->src = src_traced;
274   extra->dst = rank;
275   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
276   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
277 #endif
278
279   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
280
281 #ifdef HAVE_TRACING
282   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
283   TRACE_smpi_recv(rank, src_traced, rank);
284 #endif
285
286   log_timed_action (action, clock);
287 }
288
289 static void action_Irecv(const char *const *action)
290 {
291   int from = atoi(action[2]);
292   double size=parse_double(action[3]);
293   double clock = smpi_process_simulated_elapsed();
294   MPI_Request request;
295
296   smpi_replay_globals_t globals =
297      (smpi_replay_globals_t) smpi_process_get_user_data();
298
299   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
300   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
301
302 #ifdef HAVE_TRACING
303   int rank = smpi_comm_rank(MPI_COMM_WORLD);
304   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
305   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
306   extra->type = TRACING_IRECV;
307   extra->send_size = size;
308   extra->src = src_traced;
309   extra->dst = rank;
310   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
311   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
312 #endif
313
314   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
315
316 #ifdef HAVE_TRACING
317   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
318   request->recv = 1;
319 #endif
320   xbt_dynar_push(globals->irecvs,&request);
321   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
322
323   log_timed_action (action, clock);
324 }
325
326 static void action_wait(const char *const *action){
327   double clock = smpi_process_simulated_elapsed();
328   MPI_Request request;
329   MPI_Status status;
330   smpi_replay_globals_t globals =
331       (smpi_replay_globals_t) smpi_process_get_user_data();
332
333   xbt_assert(xbt_dynar_length(globals->irecvs),
334       "action wait not preceded by any irecv: %s",
335       xbt_str_join_array(action," "));
336   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
337 #ifdef HAVE_TRACING
338   int rank = request && request->comm != MPI_COMM_NULL
339       ? smpi_comm_rank(request->comm)
340       : -1;
341
342   MPI_Group group = smpi_comm_group(request->comm);
343   int src_traced = smpi_group_rank(group, request->src);
344   int dst_traced = smpi_group_rank(group, request->dst);
345   int is_wait_for_receive = request->recv;
346   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
347   extra->type = TRACING_WAIT;
348   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
349 #endif
350   smpi_mpi_wait(&request, &status);
351 #ifdef HAVE_TRACING
352   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
353   if (is_wait_for_receive) {
354     TRACE_smpi_recv(rank, src_traced, dst_traced);
355   }
356 #endif
357
358   log_timed_action (action, clock);
359 }
360
361 static void action_waitall(const char *const *action){
362   double clock = smpi_process_simulated_elapsed();
363   int count_requests=0;
364   unsigned int i=0;
365
366   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
367
368   if (count_requests>0) {
369     MPI_Request requests[count_requests];
370     MPI_Status status[count_requests];
371
372     /*  The reqq is an array of dynars. Its index corresponds to the rank.
373      Thus each rank saves its own requests to the array request. */
374     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
375
376   #ifdef HAVE_TRACING
377    //save information from requests
378
379    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
380    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
381    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
382    for (i = 0; i < count_requests; i++) {
383     if(requests[i]){
384       int *asrc = xbt_new(int, 1);
385       int *adst = xbt_new(int, 1);
386       int *arecv = xbt_new(int, 1);
387       *asrc = requests[i]->src;
388       *adst = requests[i]->dst;
389       *arecv = requests[i]->recv;
390       xbt_dynar_insert_at(srcs, i, asrc);
391       xbt_dynar_insert_at(dsts, i, adst);
392       xbt_dynar_insert_at(recvs, i, arecv);
393       xbt_free(asrc);
394       xbt_free(adst);
395       xbt_free(arecv);
396     }else {
397       int *t = xbt_new(int, 1);
398       xbt_dynar_insert_at(srcs, i, t);
399       xbt_dynar_insert_at(dsts, i, t);
400       xbt_dynar_insert_at(recvs, i, t);
401       xbt_free(t);
402     }
403    }
404    int rank_traced = smpi_process_index();
405    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406    extra->type = TRACING_WAITALL;
407    extra->send_size=count_requests;
408    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
409  #endif
410
411     smpi_mpi_waitall(count_requests, requests, status);
412
413   #ifdef HAVE_TRACING
414    for (i = 0; i < count_requests; i++) {
415     int src_traced, dst_traced, is_wait_for_receive;
416     xbt_dynar_get_cpy(srcs, i, &src_traced);
417     xbt_dynar_get_cpy(dsts, i, &dst_traced);
418     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
419     if (is_wait_for_receive) {
420       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
421     }
422    }
423    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
424    //clean-up of dynars
425    xbt_dynar_free(&srcs);
426    xbt_dynar_free(&dsts);
427    xbt_dynar_free(&recvs);
428   #endif
429
430    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
431   }
432   log_timed_action (action, clock);
433 }
434
435 static void action_barrier(const char *const *action){
436   double clock = smpi_process_simulated_elapsed();
437 #ifdef HAVE_TRACING
438   int rank = smpi_comm_rank(MPI_COMM_WORLD);
439   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
440   extra->type = TRACING_BARRIER;
441   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
442 #endif
443   smpi_mpi_barrier(MPI_COMM_WORLD);
444 #ifdef HAVE_TRACING
445   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
446 #endif
447
448   log_timed_action (action, clock);
449 }
450
451
452 static void action_bcast(const char *const *action)
453 {
454   double size = parse_double(action[2]);
455   double clock = smpi_process_simulated_elapsed();
456   int root=0;
457   /*
458    * Initialize MPI_CURRENT_TYPE in order to decrease
459    * the number of the checks
460    * */
461   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
462
463   if(action[3]) {
464     root= atoi(action[3]);
465     if(action[4]) {
466       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
467     }
468   }
469
470 #ifdef HAVE_TRACING
471   int rank = smpi_comm_rank(MPI_COMM_WORLD);
472   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
473
474   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475   extra->type = TRACING_BCAST;
476   extra->send_size = size;
477   extra->root = root_traced;
478   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
479   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480
481 #endif
482
483   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484 #ifdef HAVE_TRACING
485   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486 #endif
487
488   log_timed_action (action, clock);
489 }
490
491 static void action_reduce(const char *const *action)
492 {
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process_simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5]) {
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503     }
504   }
505
506 #ifdef HAVE_TRACING
507   int rank = smpi_comm_rank(MPI_COMM_WORLD);
508   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
509   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
510   extra->type = TRACING_REDUCE;
511   extra->send_size = comm_size;
512   extra->comp_size = comp_size;
513   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
514   extra->root = root_traced;
515
516   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
517 #endif
518    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519    smpi_execute_flops(comp_size);
520 #ifdef HAVE_TRACING
521   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522 #endif
523
524   log_timed_action (action, clock);
525 }
526
527 static void action_allReduce(const char *const *action) {
528   double comm_size = parse_double(action[2]);
529   double comp_size = parse_double(action[3]);
530
531   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533
534   double clock = smpi_process_simulated_elapsed();
535 #ifdef HAVE_TRACING
536   int rank = smpi_comm_rank(MPI_COMM_WORLD);
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
542
543   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
544 #endif
545   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
546   smpi_execute_flops(comp_size);
547   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
548 #ifdef HAVE_TRACING
549   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550 #endif
551
552   log_timed_action (action, clock);
553 }
554
555 static void action_allToAll(const char *const *action) {
556   double clock = smpi_process_simulated_elapsed();
557   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
558   int send_size = parse_double(action[2]);
559   int recv_size = parse_double(action[3]);
560   MPI_Datatype MPI_CURRENT_TYPE2;
561
562   if(action[4]) {
563     MPI_CURRENT_TYPE=decode_datatype(action[4]);
564     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
565   }
566   else {
567     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
569   }
570   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
571   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
572
573 #ifdef HAVE_TRACING
574   int rank = smpi_process_index();
575   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
576   extra->type = TRACING_ALLTOALL;
577   extra->send_size = send_size;
578   extra->recv_size = recv_size;
579   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
580   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
581
582   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
583 #endif
584
585   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
586
587 #ifdef HAVE_TRACING
588   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
589 #endif
590
591   log_timed_action (action, clock);
592   xbt_free(send);
593   xbt_free(recv);
594 }
595
596
597 static void action_gather(const char *const *action) {
598   /*
599  The structure of the gather action for the rank 0 (total 4 processes) 
600  is the following:   
601  0 gather 68 68 0 0 0
602
603   where: 
604   1) 68 is the sendcounts
605   2) 68 is the recvcounts
606   3) 0 is the root node
607   4) 0 is the send datatype id, see decode_datatype()
608   5) 0 is the recv datatype id, see decode_datatype()
609   */
610   double clock = smpi_process_simulated_elapsed();
611   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
612   int send_size = parse_double(action[2]);
613   int recv_size = parse_double(action[3]);
614   MPI_Datatype MPI_CURRENT_TYPE2;
615   if(action[5]) {
616     MPI_CURRENT_TYPE=decode_datatype(action[5]);
617     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
618   } else {
619     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
621   }
622   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
623   void *recv = NULL;
624
625   int root=atoi(action[4]);
626   int rank = smpi_process_index();
627
628   if(rank==root)
629     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
630
631 #ifdef HAVE_TRACING
632   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
633   extra->type = TRACING_GATHER;
634   extra->send_size = send_size;
635   extra->recv_size = recv_size;
636   extra->root = root;
637   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
638   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
639
640   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
641 #endif
642 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
643                 recv, recv_size, MPI_CURRENT_TYPE2,
644                 root, MPI_COMM_WORLD);
645
646 #ifdef HAVE_TRACING
647   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648 #endif
649
650   log_timed_action (action, clock);
651   xbt_free(send);
652   xbt_free(recv);
653 }
654
655
656
657 static void action_gatherv(const char *const *action) {
658   /*
659  The structure of the gatherv action for the rank 0 (total 4 processes)
660  is the following:
661  0 gather 68 68 10 10 10 0 0 0
662
663   where:
664   1) 68 is the sendcount
665   2) 68 10 10 10 is the recvcounts
666   3) 0 is the root node
667   4) 0 is the send datatype id, see decode_datatype()
668   5) 0 is the recv datatype id, see decode_datatype()
669   */
670   double clock = smpi_process_simulated_elapsed();
671   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
672   int send_size = parse_double(action[2]);
673   int *disps = xbt_new0(int, comm_size);
674   int *recvcounts = xbt_new0(int, comm_size);
675   int i=0,recv_sum=0;
676
677   MPI_Datatype MPI_CURRENT_TYPE2;
678   if(action[4+comm_size]) {
679     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
680     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
681   } else {
682     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
683     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
684   }
685   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
686   void *recv = NULL;
687   for(i=0;i<comm_size;i++) {
688     recvcounts[i] = atoi(action[i+3]);
689     recv_sum=recv_sum+recvcounts[i];
690     disps[i] = 0;
691   }
692
693   int root=atoi(action[3+comm_size]);
694   int rank = smpi_process_index();
695
696   if(rank==root)
697     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
698
699 #ifdef HAVE_TRACING
700   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
701   extra->type = TRACING_GATHERV;
702   extra->send_size = send_size;
703   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
704   for(i=0; i< comm_size; i++)//copy data to avoid bad free
705     extra->recvcounts[i] = recvcounts[i];
706   extra->root = root;
707   extra->num_processes = comm_size;
708   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
709   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
710
711   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
712 #endif
713 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
714                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
715                 root, MPI_COMM_WORLD);
716
717 #ifdef HAVE_TRACING
718   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
719 #endif
720
721   log_timed_action (action, clock);
722   xbt_free(recvcounts);
723   xbt_free(send);
724   if(recv)xbt_free(recv);
725   xbt_free(disps);
726
727 }
728
729 static void action_reducescatter(const char *const *action) {
730
731     /*
732  The structure of the reducescatter action for the rank 0 (total 4 processes) 
733  is the following:   
734 0 reduceScatter 275427 275427 275427 204020 11346849 0
735
736   where: 
737   1) The first four values after the name of the action declare the recvcounts array
738   2) The value 11346849 is the amount of instructions
739   3) The last value corresponds to the datatype, see decode_datatype().
740
741   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
742
743    */
744
745   double clock = smpi_process_simulated_elapsed();
746   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
747   int comp_size = parse_double(action[2+comm_size]);
748   int *recvcounts = xbt_new0(int, comm_size);  
749   int *disps = xbt_new0(int, comm_size);  
750   int i=0,recv_sum=0;
751   int root=0;
752   int rank = smpi_process_index();
753
754   if(action[3+comm_size])
755     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
756   else
757     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
758
759   for(i=0;i<comm_size;i++) {
760     recvcounts[i] = atoi(action[i+2]);
761     recv_sum=recv_sum+recvcounts[i];
762     disps[i] = 0;
763   }
764
765 #ifdef HAVE_TRACING
766   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767   extra->type = TRACING_REDUCE_SCATTER;
768   extra->send_size = 0;
769   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
770   for(i=0; i< comm_size; i++)//copy data to avoid bad free
771     extra->recvcounts[i] = recvcounts[i];
772   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
773   extra->comp_size = comp_size;
774   extra->num_processes = comm_size;
775
776
777   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
778 #endif
779    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
780        root, MPI_COMM_WORLD);
781    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
782                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
783    smpi_execute_flops(comp_size);
784
785
786 #ifdef HAVE_TRACING
787   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
788 #endif
789   xbt_free(recvcounts);
790   xbt_free(disps);
791   log_timed_action (action, clock);
792 }
793
794
795 static void action_allgatherv(const char *const *action) {
796
797   /*
798  The structure of the allgatherv action for the rank 0 (total 4 processes) 
799  is the following:   
800 0 allGatherV 275427 275427 275427 275427 204020
801
802   where: 
803   1) 275427 is the sendcount
804   2) The next four elements declare the recvcounts array
805   3) No more values mean that the datatype for sent and receive buffer
806   is the default one, see decode_datatype().
807
808    */
809
810   double clock = smpi_process_simulated_elapsed();
811
812   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
813   int i=0;
814   int sendcount=atoi(action[2]);
815   int *recvcounts = xbt_new0(int, comm_size);  
816   int *disps = xbt_new0(int, comm_size);  
817   int recv_sum=0;  
818   MPI_Datatype MPI_CURRENT_TYPE2;
819
820   if(action[3+comm_size]) {
821     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
822     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
823   } else {
824     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
825     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
826   }
827   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
828
829   for(i=0;i<comm_size;i++) {
830     recvcounts[i] = atoi(action[i+3]);
831     recv_sum=recv_sum+recvcounts[i];
832   }
833   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
834
835 #ifdef HAVE_TRACING
836   int rank = smpi_process_index();
837   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
838   extra->type = TRACING_ALLGATHERV;
839   extra->send_size = sendcount;
840   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
841   for(i=0; i< comm_size; i++)//copy data to avoid bad free
842     extra->recvcounts[i] = recvcounts[i];
843   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
844   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
845   extra->num_processes = comm_size;
846
847   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
848 #endif
849
850 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
851
852 #ifdef HAVE_TRACING
853   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
854 #endif
855
856   log_timed_action (action, clock);
857   xbt_free(sendbuf);
858   xbt_free(recvbuf);
859   xbt_free(recvcounts);
860   xbt_free(disps);
861 }
862
863
864 static void action_allToAllv(const char *const *action) {
865   /*
866  The structure of the allToAllV action for the rank 0 (total 4 processes) 
867  is the following:   
868   0 allToAllV 100 1 7 10 12 100 1 70 10 5
869
870   where: 
871   1) 100 is the size of the send buffer *sizeof(int),
872   2) 1 7 10 12 is the sendcounts array
873   3) 100*sizeof(int) is the size of the receiver buffer
874   4)  1 70 10 5 is the recvcounts array
875
876    */
877
878
879   double clock = smpi_process_simulated_elapsed();
880
881   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
882   int send_buf_size=0,recv_buf_size=0,i=0;
883   int *sendcounts = xbt_new0(int, comm_size);  
884   int *recvcounts = xbt_new0(int, comm_size);  
885   int *senddisps = xbt_new0(int, comm_size);  
886   int *recvdisps = xbt_new0(int, comm_size);  
887
888   MPI_Datatype MPI_CURRENT_TYPE2;
889
890   send_buf_size=parse_double(action[2]);
891   recv_buf_size=parse_double(action[3+comm_size]);
892   if(action[4+2*comm_size]) {
893     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
894     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
895   }
896   else {
897       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
898       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
899   }
900
901   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
902   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
903
904   for(i=0;i<comm_size;i++) {
905     sendcounts[i] = atoi(action[i+3]);
906     recvcounts[i] = atoi(action[i+4+comm_size]);
907   }
908
909
910 #ifdef HAVE_TRACING
911   int rank = smpi_process_index();
912   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
913   extra->type = TRACING_ALLTOALLV;
914   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
915   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
916   extra->num_processes = comm_size;
917
918   for(i=0; i< comm_size; i++){//copy data to avoid bad free
919     extra->send_size += sendcounts[i];
920     extra->sendcounts[i] = sendcounts[i];
921     extra->recv_size += recvcounts[i];
922     extra->recvcounts[i] = recvcounts[i];
923   }
924   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
925   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
926
927   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
928 #endif
929     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
930                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
931                                MPI_COMM_WORLD);
932 #ifdef HAVE_TRACING
933   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
934 #endif
935
936   log_timed_action (action, clock);
937   xbt_free(sendbuf);
938   xbt_free(recvbuf);
939   xbt_free(sendcounts);
940   xbt_free(recvcounts);
941   xbt_free(senddisps);
942   xbt_free(recvdisps);
943 }
944
945 void smpi_replay_init(int *argc, char***argv){
946   smpi_process_init(argc, argv);
947   smpi_process_mark_as_initialized();
948 #ifdef HAVE_TRACING
949   int rank = smpi_process_index();
950   TRACE_smpi_init(rank);
951   TRACE_smpi_computing_init(rank);
952   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
953   extra->type = TRACING_INIT;
954   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
955   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
956 #endif
957
958   if (!smpi_process_index()){
959     _xbt_replay_action_init();
960     xbt_replay_action_register("init",       action_init);
961     xbt_replay_action_register("finalize",   action_finalize);
962     xbt_replay_action_register("comm_size",  action_comm_size);
963     xbt_replay_action_register("comm_split", action_comm_split);
964     xbt_replay_action_register("comm_dup",   action_comm_dup);
965     xbt_replay_action_register("send",       action_send);
966     xbt_replay_action_register("Isend",      action_Isend);
967     xbt_replay_action_register("recv",       action_recv);
968     xbt_replay_action_register("Irecv",      action_Irecv);
969     xbt_replay_action_register("wait",       action_wait);
970     xbt_replay_action_register("waitAll",    action_waitall);
971     xbt_replay_action_register("barrier",    action_barrier);
972     xbt_replay_action_register("bcast",      action_bcast);
973     xbt_replay_action_register("reduce",     action_reduce);
974     xbt_replay_action_register("allReduce",  action_allReduce);
975     xbt_replay_action_register("allToAll",   action_allToAll);
976     xbt_replay_action_register("allToAllV",  action_allToAllv);
977     xbt_replay_action_register("gather",  action_gather);
978     xbt_replay_action_register("gatherV",  action_gatherv);
979     xbt_replay_action_register("allGatherV",  action_allgatherv);
980     xbt_replay_action_register("reduceScatter",  action_reducescatter);
981     xbt_replay_action_register("compute",    action_compute);
982   }
983
984   xbt_replay_action_runner(*argc, *argv);
985 }
986
987 int smpi_replay_finalize(){
988   double sim_time= 1.;
989   /* One active process will stop. Decrease the counter*/
990   active_processes--;
991   XBT_DEBUG("There are %lu elements in reqq[*]",
992             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
993   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
994   if(!active_processes){
995     /* Last process alive speaking */
996     /* end the simulated timer */
997     sim_time = smpi_process_simulated_elapsed();
998     XBT_INFO("Simulation time %g", sim_time);
999     _xbt_replay_action_exit();
1000     xbt_free(reqq);
1001     reqq = NULL;
1002   }
1003   smpi_mpi_barrier(MPI_COMM_WORLD);
1004 #ifdef HAVE_TRACING
1005   int rank = smpi_process_index();
1006   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1007   extra->type = TRACING_FINALIZE;
1008   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1009 #endif
1010   smpi_process_finalize();
1011 #ifdef HAVE_TRACING
1012   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1013   TRACE_smpi_finalize(smpi_process_index());
1014 #endif
1015   smpi_process_destroy();
1016   return MPI_SUCCESS;
1017 }