Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add gatherv replay action (bigdft can be replayed easily now)
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80
81 const char* encode_datatype(MPI_Datatype datatype)
82 {
83
84   //default type for output is set to MPI_BYTE
85   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
86   if (datatype==MPI_BYTE){
87       return "";
88   }
89   if(datatype==MPI_DOUBLE)
90       return "0";
91   if(datatype==MPI_INT)
92       return "1";
93   if(datatype==MPI_CHAR)
94       return "2";
95   if(datatype==MPI_SHORT)
96       return "3";
97   if(datatype==MPI_LONG)
98     return "4";
99   if(datatype==MPI_FLOAT)
100       return "5";
101
102   // default - not implemented.
103   // do not warn here as we pass in this function even for other trace formats
104   return "-1";
105 }
106
107 static void action_init(const char *const *action)
108 {
109   int i;
110   XBT_DEBUG("Initialize the counters");
111   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
112   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
113
114   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
115   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
116
117   smpi_process_set_user_data((void*) globals);
118
119   /* start a simulated timer */
120   smpi_process_simulated_start();
121   /*initialize the number of active processes */
122   active_processes = smpi_process_count();
123
124   if (!reqq) {
125     reqq=xbt_new0(xbt_dynar_t,active_processes);
126
127     for(i=0;i<active_processes;i++){
128       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
129     }
130   }
131 }
132
133 static void action_finalize(const char *const *action)
134 {
135   smpi_replay_globals_t globals =
136       (smpi_replay_globals_t) smpi_process_get_user_data();
137   if (globals){
138     XBT_DEBUG("There are %lu irecvs in the dynar",
139          xbt_dynar_length(globals->irecvs));
140     xbt_dynar_free_container(&(globals->irecvs));
141   }
142   free(globals);
143 }
144
145 static void action_comm_size(const char *const *action)
146 {
147   double clock = smpi_process_simulated_elapsed();
148
149   communicator_size = parse_double(action[2]);
150   log_timed_action (action, clock);
151 }
152
153 static void action_comm_split(const char *const *action)
154 {
155   double clock = smpi_process_simulated_elapsed();
156
157   log_timed_action (action, clock);
158 }
159
160 static void action_comm_dup(const char *const *action)
161 {
162   double clock = smpi_process_simulated_elapsed();
163
164   log_timed_action (action, clock);
165 }
166
167 static void action_compute(const char *const *action)
168 {
169   double clock = smpi_process_simulated_elapsed();
170   double flops= parse_double(action[2]);
171 #ifdef HAVE_TRACING
172   int rank = smpi_comm_rank(MPI_COMM_WORLD);
173   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
174   extra->type=TRACING_COMPUTING;
175   extra->comp_size=flops;
176   TRACE_smpi_computing_in(rank, extra);
177 #endif
178   smpi_execute_flops(flops);
179 #ifdef HAVE_TRACING
180   TRACE_smpi_computing_out(rank);
181 #endif
182
183   log_timed_action (action, clock);
184 }
185
186 static void action_send(const char *const *action)
187 {
188   int to = atoi(action[2]);
189   double size=parse_double(action[3]);
190   double clock = smpi_process_simulated_elapsed();
191
192   if(action[4]) {
193     MPI_CURRENT_TYPE=decode_datatype(action[4]);
194   } else {
195     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
196   }
197
198 #ifdef HAVE_TRACING
199   int rank = smpi_comm_rank(MPI_COMM_WORLD);
200
201   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
202   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
203   extra->type = TRACING_SEND;
204   extra->send_size = size;
205   extra->src = rank;
206   extra->dst = dst_traced;
207   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
208   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
209   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
210 #endif
211
212   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
213
214   log_timed_action (action, clock);
215
216   #ifdef HAVE_TRACING
217   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
218 #endif
219
220 }
221
222 static void action_Isend(const char *const *action)
223 {
224   int to = atoi(action[2]);
225   double size=parse_double(action[3]);
226   double clock = smpi_process_simulated_elapsed();
227   MPI_Request request;
228
229   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
230   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231
232 #ifdef HAVE_TRACING
233   int rank = smpi_comm_rank(MPI_COMM_WORLD);
234   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
235   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
236   extra->type = TRACING_ISEND;
237   extra->send_size = size;
238   extra->src = rank;
239   extra->dst = dst_traced;
240   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
241   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
242   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243 #endif
244
245   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
246
247 #ifdef HAVE_TRACING
248   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
249   request->send = 1;
250 #endif
251
252   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
253
254   log_timed_action (action, clock);
255 }
256
257 static void action_recv(const char *const *action) {
258   int from = atoi(action[2]);
259   double size=parse_double(action[3]);
260   double clock = smpi_process_simulated_elapsed();
261   MPI_Status status;
262
263   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
264   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265
266 #ifdef HAVE_TRACING
267   int rank = smpi_comm_rank(MPI_COMM_WORLD);
268   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
269
270   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
271   extra->type = TRACING_RECV;
272   extra->send_size = size;
273   extra->src = src_traced;
274   extra->dst = rank;
275   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
276   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
277 #endif
278
279   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
280
281 #ifdef HAVE_TRACING
282   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
283   TRACE_smpi_recv(rank, src_traced, rank);
284 #endif
285
286   log_timed_action (action, clock);
287 }
288
289 static void action_Irecv(const char *const *action)
290 {
291   int from = atoi(action[2]);
292   double size=parse_double(action[3]);
293   double clock = smpi_process_simulated_elapsed();
294   MPI_Request request;
295
296   smpi_replay_globals_t globals =
297      (smpi_replay_globals_t) smpi_process_get_user_data();
298
299   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
300   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
301
302 #ifdef HAVE_TRACING
303   int rank = smpi_comm_rank(MPI_COMM_WORLD);
304   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
305   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
306   extra->type = TRACING_IRECV;
307   extra->send_size = size;
308   extra->src = src_traced;
309   extra->dst = rank;
310   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
311   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
312 #endif
313
314   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
315
316 #ifdef HAVE_TRACING
317   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
318   request->recv = 1;
319 #endif
320   xbt_dynar_push(globals->irecvs,&request);
321   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
322
323   log_timed_action (action, clock);
324 }
325
326 static void action_wait(const char *const *action){
327   double clock = smpi_process_simulated_elapsed();
328   MPI_Request request;
329   MPI_Status status;
330   smpi_replay_globals_t globals =
331       (smpi_replay_globals_t) smpi_process_get_user_data();
332
333   xbt_assert(xbt_dynar_length(globals->irecvs),
334       "action wait not preceded by any irecv: %s",
335       xbt_str_join_array(action," "));
336   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
337 #ifdef HAVE_TRACING
338   int rank = request && request->comm != MPI_COMM_NULL
339       ? smpi_comm_rank(request->comm)
340       : -1;
341
342   MPI_Group group = smpi_comm_group(request->comm);
343   int src_traced = smpi_group_rank(group, request->src);
344   int dst_traced = smpi_group_rank(group, request->dst);
345   int is_wait_for_receive = request->recv;
346   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
347   extra->type = TRACING_WAIT;
348   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
349 #endif
350   smpi_mpi_wait(&request, &status);
351 #ifdef HAVE_TRACING
352   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
353   if (is_wait_for_receive) {
354     TRACE_smpi_recv(rank, src_traced, dst_traced);
355   }
356 #endif
357
358   log_timed_action (action, clock);
359 }
360
361 static void action_waitall(const char *const *action){
362   double clock = smpi_process_simulated_elapsed();
363   int count_requests=0;
364   unsigned int i=0;
365
366   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
367
368   if (count_requests>0) {
369     MPI_Request requests[count_requests];
370     MPI_Status status[count_requests];
371
372     /*  The reqq is an array of dynars. Its index corresponds to the rank.
373      Thus each rank saves its own requests to the array request. */
374     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
375
376   #ifdef HAVE_TRACING
377    //save information from requests
378
379    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
380    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
381    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
382    for (i = 0; i < count_requests; i++) {
383     if(requests[i]){
384       int *asrc = xbt_new(int, 1);
385       int *adst = xbt_new(int, 1);
386       int *arecv = xbt_new(int, 1);
387       *asrc = requests[i]->src;
388       *adst = requests[i]->dst;
389       *arecv = requests[i]->recv;
390       xbt_dynar_insert_at(srcs, i, asrc);
391       xbt_dynar_insert_at(dsts, i, adst);
392       xbt_dynar_insert_at(recvs, i, arecv);
393       xbt_free(asrc);
394       xbt_free(adst);
395       xbt_free(arecv);
396     }else {
397       int *t = xbt_new(int, 1);
398       xbt_dynar_insert_at(srcs, i, t);
399       xbt_dynar_insert_at(dsts, i, t);
400       xbt_dynar_insert_at(recvs, i, t);
401       xbt_free(t);
402     }
403    }
404    int rank_traced = smpi_process_index();
405    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406    extra->type = TRACING_WAITALL;
407    extra->send_size=count_requests;
408    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
409  #endif
410
411     smpi_mpi_waitall(count_requests, requests, status);
412
413   #ifdef HAVE_TRACING
414    for (i = 0; i < count_requests; i++) {
415     int src_traced, dst_traced, is_wait_for_receive;
416     xbt_dynar_get_cpy(srcs, i, &src_traced);
417     xbt_dynar_get_cpy(dsts, i, &dst_traced);
418     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
419     if (is_wait_for_receive) {
420       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
421     }
422    }
423    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
424    //clean-up of dynars
425    xbt_dynar_free(&srcs);
426    xbt_dynar_free(&dsts);
427    xbt_dynar_free(&recvs);
428   #endif
429
430    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
431   }
432   log_timed_action (action, clock);
433 }
434
435 static void action_barrier(const char *const *action){
436   double clock = smpi_process_simulated_elapsed();
437 #ifdef HAVE_TRACING
438   int rank = smpi_comm_rank(MPI_COMM_WORLD);
439   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
440   extra->type = TRACING_BARRIER;
441   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
442 #endif
443   smpi_mpi_barrier(MPI_COMM_WORLD);
444 #ifdef HAVE_TRACING
445   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
446 #endif
447
448   log_timed_action (action, clock);
449 }
450
451
452 static void action_bcast(const char *const *action)
453 {
454   double size = parse_double(action[2]);
455   double clock = smpi_process_simulated_elapsed();
456   int root=0;
457   /*
458    * Initialize MPI_CURRENT_TYPE in order to decrease
459    * the number of the checks
460    * */
461   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
462
463   if(action[3]) {
464     root= atoi(action[3]);
465     if(action[4]) {
466       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
467     }
468   }
469
470 #ifdef HAVE_TRACING
471   int rank = smpi_comm_rank(MPI_COMM_WORLD);
472   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
473
474   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
475   extra->type = TRACING_BCAST;
476   extra->send_size = size;
477   extra->root = root_traced;
478   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
479   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
480
481 #endif
482
483   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
484 #ifdef HAVE_TRACING
485   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
486 #endif
487
488   log_timed_action (action, clock);
489 }
490
491 static void action_reduce(const char *const *action)
492 {
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process_simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5]) {
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503     }
504   }
505
506 #ifdef HAVE_TRACING
507   int rank = smpi_comm_rank(MPI_COMM_WORLD);
508   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
509   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
510   extra->type = TRACING_REDUCE;
511   extra->send_size = comm_size;
512   extra->comp_size = comp_size;
513   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
514   extra->root = root_traced;
515
516   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
517 #endif
518    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519    smpi_execute_flops(comp_size);
520 #ifdef HAVE_TRACING
521   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522 #endif
523
524   log_timed_action (action, clock);
525 }
526
527 static void action_allReduce(const char *const *action) {
528   double comm_size = parse_double(action[2]);
529   double comp_size = parse_double(action[3]);
530
531   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
533
534   double clock = smpi_process_simulated_elapsed();
535 #ifdef HAVE_TRACING
536   int rank = smpi_comm_rank(MPI_COMM_WORLD);
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
542
543   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
544 #endif
545   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
546   smpi_execute_flops(comp_size);
547   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
548 #ifdef HAVE_TRACING
549   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550 #endif
551
552   log_timed_action (action, clock);
553 }
554
555 static void action_allToAll(const char *const *action) {
556   double clock = smpi_process_simulated_elapsed();
557   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
558   int send_size = parse_double(action[2]);
559   int recv_size = parse_double(action[3]);
560   MPI_Datatype MPI_CURRENT_TYPE2;
561
562   if(action[4]) {
563     MPI_CURRENT_TYPE=decode_datatype(action[4]);
564     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
565   }
566   else {
567     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
569   }
570   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
571   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
572
573 #ifdef HAVE_TRACING
574   int rank = smpi_process_index();
575   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
576   extra->type = TRACING_ALLTOALL;
577   extra->send_size = send_size;
578   extra->recv_size = recv_size;
579   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
580   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
581
582   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
583 #endif
584
585   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
586
587 #ifdef HAVE_TRACING
588   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
589 #endif
590
591   log_timed_action (action, clock);
592   xbt_free(send);
593   xbt_free(recv);
594 }
595
596
597 static void action_gather(const char *const *action) {
598   /*
599  The structure of the gather action for the rank 0 (total 4 processes) 
600  is the following:   
601  0 gather 68 68 0 0 0
602
603   where: 
604   1) 68 is the sendcounts
605   2) 68 is the recvcounts
606   3) 0 is the root node
607   4) 0 is the send datatype id, see decode_datatype()
608   5) 0 is the recv datatype id, see decode_datatype()
609   */
610   double clock = smpi_process_simulated_elapsed();
611   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
612   int send_size = parse_double(action[2]);
613   int recv_size = parse_double(action[3]);
614   MPI_Datatype MPI_CURRENT_TYPE2;
615   if(action[5]) {
616     MPI_CURRENT_TYPE=decode_datatype(action[5]);
617     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
618   } else {
619     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
621   }
622   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
623   void *recv = NULL;
624
625   int root=atoi(action[4]);
626   int rank = smpi_process_index();
627
628   if(rank==root)
629     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
630
631 #ifdef HAVE_TRACING
632   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
633   extra->type = TRACING_GATHER;
634   extra->send_size = send_size;
635   extra->recv_size = recv_size;
636   extra->root = root;
637   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
638   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
639
640   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
641 #endif
642 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
643                 recv, recv_size, MPI_CURRENT_TYPE2,
644                 root, MPI_COMM_WORLD);
645
646 #ifdef HAVE_TRACING
647   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
648 #endif
649
650   log_timed_action (action, clock);
651   xbt_free(send);
652   xbt_free(recv);
653 }
654
655
656
657 static void action_gatherv(const char *const *action) {
658   /*
659  The structure of the gatherv action for the rank 0 (total 4 processes)
660  is the following:
661  0 gather 68 68 10 10 10 0 0 0
662
663   where:
664   1) 68 is the sendcount
665   2) 68 10 10 10 is the recvcounts
666   3) 0 is the root node
667   4) 0 is the send datatype id, see decode_datatype()
668   5) 0 is the recv datatype id, see decode_datatype()
669   */
670   double clock = smpi_process_simulated_elapsed();
671   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
672   int send_size = parse_double(action[2]);
673   int *disps = xbt_new0(int, comm_size);
674   int *recvcounts = xbt_new0(int, comm_size);
675   int i=0,recv_sum=0;
676
677   MPI_Datatype MPI_CURRENT_TYPE2;
678   if(action[4+comm_size]) {
679     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
680     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
681   } else {
682     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
683     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
684   }
685   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
686   void *recv = NULL;
687   for(i=0;i<comm_size;i++) {
688     recvcounts[i] = atoi(action[i+3]);
689     recv_sum=recv_sum+recvcounts[i];
690     disps[i] = 0;
691   }
692
693   int root=atoi(action[3+comm_size]);
694   int rank = smpi_process_index();
695
696   if(rank==root)
697     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
698
699 #ifdef HAVE_TRACING
700   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
701   extra->type = TRACING_GATHERV;
702   extra->send_size = send_size;
703   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
704   for(i=0; i< comm_size; i++)//copy data to avoid bad free
705     extra->recvcounts[i] = recvcounts[i];
706   extra->root = root;
707   extra->num_processes = comm_size;
708   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
709   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
710
711   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
712 #endif
713 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
714                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
715                 root, MPI_COMM_WORLD);
716
717 #ifdef HAVE_TRACING
718   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
719 #endif
720
721   log_timed_action (action, clock);
722   xbt_free(send);
723   xbt_free(recv);
724 }
725
726 static void action_reducescatter(const char *const *action) {
727
728     /*
729  The structure of the reducescatter action for the rank 0 (total 4 processes) 
730  is the following:   
731 0 reduceScatter 275427 275427 275427 204020 11346849 0
732
733   where: 
734   1) The first four values after the name of the action declare the recvcounts array
735   2) The value 11346849 is the amount of instructions
736   3) The last value corresponds to the datatype, see decode_datatype().
737
738   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
739
740    */
741
742   double clock = smpi_process_simulated_elapsed();
743   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
744   int comp_size = parse_double(action[2+comm_size]);
745   int *recvcounts = xbt_new0(int, comm_size);  
746   int *disps = xbt_new0(int, comm_size);  
747   int i=0,recv_sum=0;
748   int root=0;
749   int rank = smpi_process_index();
750
751   if(action[3+comm_size])
752     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
753   else
754     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
755
756   for(i=0;i<comm_size;i++) {
757     recvcounts[i] = atoi(action[i+2]);
758     recv_sum=recv_sum+recvcounts[i];
759     disps[i] = 0;
760   }
761
762 #ifdef HAVE_TRACING
763   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
764   extra->type = TRACING_REDUCE_SCATTER;
765   extra->send_size = 0;
766   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
767   for(i=0; i< comm_size; i++)//copy data to avoid bad free
768     extra->recvcounts[i] = recvcounts[i];
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770   extra->comp_size = comp_size;
771   extra->num_processes = comm_size;
772
773
774   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
775 #endif
776    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
777        root, MPI_COMM_WORLD);
778    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
779                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
780    smpi_execute_flops(comp_size);
781
782
783 #ifdef HAVE_TRACING
784   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
785 #endif
786
787   log_timed_action (action, clock);
788 }
789
790
791 static void action_allgatherv(const char *const *action) {
792
793   /*
794  The structure of the allgatherv action for the rank 0 (total 4 processes) 
795  is the following:   
796 0 allGatherV 275427 275427 275427 275427 204020
797
798   where: 
799   1) 275427 is the sendcount
800   2) The next four elements declare the recvcounts array
801   3) No more values mean that the datatype for sent and receive buffer
802   is the default one, see decode_datatype().
803
804    */
805
806   double clock = smpi_process_simulated_elapsed();
807
808   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
809   int i=0;
810   int sendcount=atoi(action[2]);
811   int *recvcounts = xbt_new0(int, comm_size);  
812   int *disps = xbt_new0(int, comm_size);  
813   int recv_sum=0;  
814   MPI_Datatype MPI_CURRENT_TYPE2;
815
816   if(action[3+comm_size]) {
817     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
818     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
819   } else {
820     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
821     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
822   }
823   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
824
825   for(i=0;i<comm_size;i++) {
826     recvcounts[i] = atoi(action[i+3]);
827     recv_sum=recv_sum+recvcounts[i];
828   }
829   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
830
831 #ifdef HAVE_TRACING
832   int rank = smpi_process_index();
833   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
834   extra->type = TRACING_ALLGATHERV;
835   extra->send_size = sendcount;
836   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
837   for(i=0; i< comm_size; i++)//copy data to avoid bad free
838     extra->recvcounts[i] = recvcounts[i];
839   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
840   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
841   extra->num_processes = comm_size;
842
843   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
844 #endif
845
846 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
847
848 #ifdef HAVE_TRACING
849   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
850 #endif
851
852   log_timed_action (action, clock);
853   xbt_free(sendbuf);
854   xbt_free(recvbuf);
855   xbt_free(recvcounts);
856   xbt_free(disps);
857 }
858
859
860 static void action_allToAllv(const char *const *action) {
861   /*
862  The structure of the allToAllV action for the rank 0 (total 4 processes) 
863  is the following:   
864   0 allToAllV 100 1 7 10 12 100 1 70 10 5
865
866   where: 
867   1) 100 is the size of the send buffer *sizeof(int),
868   2) 1 7 10 12 is the sendcounts array
869   3) 100*sizeof(int) is the size of the receiver buffer
870   4)  1 70 10 5 is the recvcounts array
871
872    */
873
874
875   double clock = smpi_process_simulated_elapsed();
876
877   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
878   int send_buf_size=0,recv_buf_size=0,i=0;
879   int *sendcounts = xbt_new0(int, comm_size);  
880   int *recvcounts = xbt_new0(int, comm_size);  
881   int *senddisps = xbt_new0(int, comm_size);  
882   int *recvdisps = xbt_new0(int, comm_size);  
883
884   MPI_Datatype MPI_CURRENT_TYPE2;
885
886   send_buf_size=parse_double(action[2]);
887   recv_buf_size=parse_double(action[3+comm_size]);
888   if(action[4+2*comm_size]) {
889     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
890     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
891   }
892   else {
893       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
894       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
895   }
896
897   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
898   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
899
900   for(i=0;i<comm_size;i++) {
901     sendcounts[i] = atoi(action[i+3]);
902     recvcounts[i] = atoi(action[i+4+comm_size]);
903   }
904
905
906 #ifdef HAVE_TRACING
907   int rank = smpi_process_index();
908   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
909   extra->type = TRACING_ALLTOALLV;
910   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
911   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
912   extra->num_processes = comm_size;
913
914   for(i=0; i< comm_size; i++){//copy data to avoid bad free
915     extra->send_size += sendcounts[i];
916     extra->sendcounts[i] = sendcounts[i];
917     extra->recv_size += recvcounts[i];
918     extra->recvcounts[i] = recvcounts[i];
919   }
920   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
921   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
922
923   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
924 #endif
925     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
926                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
927                                MPI_COMM_WORLD);
928 #ifdef HAVE_TRACING
929   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
930 #endif
931
932   log_timed_action (action, clock);
933   xbt_free(sendbuf);
934   xbt_free(recvbuf);
935   xbt_free(sendcounts);
936   xbt_free(recvcounts);
937   xbt_free(senddisps);
938   xbt_free(recvdisps);
939 }
940
941 void smpi_replay_init(int *argc, char***argv){
942   smpi_process_init(argc, argv);
943   smpi_process_mark_as_initialized();
944 #ifdef HAVE_TRACING
945   int rank = smpi_process_index();
946   TRACE_smpi_init(rank);
947   TRACE_smpi_computing_init(rank);
948   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
949   extra->type = TRACING_INIT;
950   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
951   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
952 #endif
953
954   if (!smpi_process_index()){
955     _xbt_replay_action_init();
956     xbt_replay_action_register("init",       action_init);
957     xbt_replay_action_register("finalize",   action_finalize);
958     xbt_replay_action_register("comm_size",  action_comm_size);
959     xbt_replay_action_register("comm_split", action_comm_split);
960     xbt_replay_action_register("comm_dup",   action_comm_dup);
961     xbt_replay_action_register("send",       action_send);
962     xbt_replay_action_register("Isend",      action_Isend);
963     xbt_replay_action_register("recv",       action_recv);
964     xbt_replay_action_register("Irecv",      action_Irecv);
965     xbt_replay_action_register("wait",       action_wait);
966     xbt_replay_action_register("waitAll",    action_waitall);
967     xbt_replay_action_register("barrier",    action_barrier);
968     xbt_replay_action_register("bcast",      action_bcast);
969     xbt_replay_action_register("reduce",     action_reduce);
970     xbt_replay_action_register("allReduce",  action_allReduce);
971     xbt_replay_action_register("allToAll",   action_allToAll);
972     xbt_replay_action_register("allToAllV",  action_allToAllv);
973     xbt_replay_action_register("gather",  action_gather);
974     xbt_replay_action_register("gatherV",  action_gatherv);
975     xbt_replay_action_register("allGatherV",  action_allgatherv);
976     xbt_replay_action_register("reduceScatter",  action_reducescatter);
977     xbt_replay_action_register("compute",    action_compute);
978   }
979
980   xbt_replay_action_runner(*argc, *argv);
981 }
982
983 int smpi_replay_finalize(){
984   double sim_time= 1.;
985   /* One active process will stop. Decrease the counter*/
986   active_processes--;
987   XBT_DEBUG("There are %lu elements in reqq[*]",
988             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
989   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
990   if(!active_processes){
991     /* Last process alive speaking */
992     /* end the simulated timer */
993     sim_time = smpi_process_simulated_elapsed();
994     XBT_INFO("Simulation time %g", sim_time);
995     _xbt_replay_action_exit();
996     xbt_free(reqq);
997     reqq = NULL;
998   }
999   smpi_mpi_barrier(MPI_COMM_WORLD);
1000 #ifdef HAVE_TRACING
1001   int rank = smpi_process_index();
1002   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1003   extra->type = TRACING_FINALIZE;
1004   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1005 #endif
1006   smpi_process_finalize();
1007 #ifdef HAVE_TRACING
1008   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1009   TRACE_smpi_finalize(smpi_process_index());
1010 #endif
1011   smpi_process_destroy();
1012   return MPI_SUCCESS;
1013 }