Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
some files that are created on windows have \r to trim as well
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 /* Helper function */
31 static double parse_double(const char *string)
32 {
33   double value;
34   char *endptr;
35   value = strtod(string, &endptr);
36   if (*endptr != '\0')
37     THROWF(unknown_error, 0, "%s is not a double", string);
38   return value;
39 }
40
41 static MPI_Datatype decode_datatype(const char *const action)
42 {
43 // Declared datatypes,
44
45   switch(atoi(action))
46   {
47     case 0:
48       MPI_CURRENT_TYPE=MPI_DOUBLE;
49       break;
50     case 1:
51       MPI_CURRENT_TYPE=MPI_INT;
52       break;
53     case 2:
54       MPI_CURRENT_TYPE=MPI_CHAR;
55       break;
56     case 3:
57       MPI_CURRENT_TYPE=MPI_SHORT;
58       break;
59     case 4:
60       MPI_CURRENT_TYPE=MPI_LONG;
61       break;
62     case 5:
63       MPI_CURRENT_TYPE=MPI_FLOAT;
64       break;
65     case 6:
66       MPI_CURRENT_TYPE=MPI_BYTE;
67       break;
68     default:
69       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
70
71   }
72    return MPI_CURRENT_TYPE;
73 }
74
75
76 const char* encode_datatype(MPI_Datatype datatype)
77 {
78
79   //default type for output is set to MPI_BYTE
80   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
81   if (datatype==MPI_BYTE){
82       return "";
83   }
84   if(datatype==MPI_DOUBLE)
85       return "0";
86   if(datatype==MPI_INT)
87       return "1";
88   if(datatype==MPI_CHAR)
89       return "2";
90   if(datatype==MPI_SHORT)
91       return "3";
92   if(datatype==MPI_LONG)
93     return "4";
94   if(datatype==MPI_FLOAT)
95       return "5";
96
97   // default - not implemented.
98   // do not warn here as we pass in this function even for other trace formats
99   return "-1";
100 }
101
102 static void action_init(const char *const *action)
103 {
104   int i;
105   XBT_DEBUG("Initialize the counters");
106
107   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
108   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
109
110   /* start a simulated timer */
111   smpi_process_simulated_start();
112   /*initialize the number of active processes */
113   active_processes = smpi_process_count();
114
115   if (!reqq) {
116     reqq=xbt_new0(xbt_dynar_t,active_processes);
117
118     for(i=0;i<active_processes;i++){
119       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
120     }
121   }
122 }
123
124 static void action_finalize(const char *const *action)
125 {
126 }
127
128 static void action_comm_size(const char *const *action)
129 {
130   double clock = smpi_process_simulated_elapsed();
131
132   communicator_size = parse_double(action[2]);
133   log_timed_action (action, clock);
134 }
135
136 static void action_comm_split(const char *const *action)
137 {
138   double clock = smpi_process_simulated_elapsed();
139
140   log_timed_action (action, clock);
141 }
142
143 static void action_comm_dup(const char *const *action)
144 {
145   double clock = smpi_process_simulated_elapsed();
146
147   log_timed_action (action, clock);
148 }
149
150 static void action_compute(const char *const *action)
151 {
152   double clock = smpi_process_simulated_elapsed();
153   double flops= parse_double(action[2]);
154 #ifdef HAVE_TRACING
155   int rank = smpi_comm_rank(MPI_COMM_WORLD);
156   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
157   extra->type=TRACING_COMPUTING;
158   extra->comp_size=flops;
159   TRACE_smpi_computing_in(rank, extra);
160 #endif
161   smpi_execute_flops(flops);
162 #ifdef HAVE_TRACING
163   TRACE_smpi_computing_out(rank);
164 #endif
165
166   log_timed_action (action, clock);
167 }
168
169 static void action_send(const char *const *action)
170 {
171   int to = atoi(action[2]);
172   double size=parse_double(action[3]);
173   double clock = smpi_process_simulated_elapsed();
174
175   if(action[4]) {
176     MPI_CURRENT_TYPE=decode_datatype(action[4]);
177   } else {
178     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
179   }
180
181 #ifdef HAVE_TRACING
182   int rank = smpi_comm_rank(MPI_COMM_WORLD);
183
184   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
185   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
186   extra->type = TRACING_SEND;
187   extra->send_size = size;
188   extra->src = rank;
189   extra->dst = dst_traced;
190   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
191   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
192   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
193 #endif
194
195   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
196
197   log_timed_action (action, clock);
198
199   #ifdef HAVE_TRACING
200   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
201 #endif
202
203 }
204
205 static void action_Isend(const char *const *action)
206 {
207   int to = atoi(action[2]);
208   double size=parse_double(action[3]);
209   double clock = smpi_process_simulated_elapsed();
210   MPI_Request request;
211
212   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
213   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
214
215 #ifdef HAVE_TRACING
216   int rank = smpi_comm_rank(MPI_COMM_WORLD);
217   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
218   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
219   extra->type = TRACING_ISEND;
220   extra->send_size = size;
221   extra->src = rank;
222   extra->dst = dst_traced;
223   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
224   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
225   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
226 #endif
227
228   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
229
230 #ifdef HAVE_TRACING
231   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
232   request->send = 1;
233 #endif
234
235   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
236
237   log_timed_action (action, clock);
238 }
239
240 static void action_recv(const char *const *action) {
241   int from = atoi(action[2]);
242   double size=parse_double(action[3]);
243   double clock = smpi_process_simulated_elapsed();
244   MPI_Status status;
245
246   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
247   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
248
249 #ifdef HAVE_TRACING
250   int rank = smpi_comm_rank(MPI_COMM_WORLD);
251   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
252
253   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
254   extra->type = TRACING_RECV;
255   extra->send_size = size;
256   extra->src = src_traced;
257   extra->dst = rank;
258   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
259   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
260 #endif
261
262   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
263
264 #ifdef HAVE_TRACING
265   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
266   TRACE_smpi_recv(rank, src_traced, rank);
267 #endif
268
269   log_timed_action (action, clock);
270 }
271
272 static void action_Irecv(const char *const *action)
273 {
274   int from = atoi(action[2]);
275   double size=parse_double(action[3]);
276   double clock = smpi_process_simulated_elapsed();
277   MPI_Request request;
278
279   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
280   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
281
282 #ifdef HAVE_TRACING
283   int rank = smpi_comm_rank(MPI_COMM_WORLD);
284   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
285   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
286   extra->type = TRACING_IRECV;
287   extra->send_size = size;
288   extra->src = src_traced;
289   extra->dst = rank;
290   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
291   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
292 #endif
293
294   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
295
296 #ifdef HAVE_TRACING
297   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
298   request->recv = 1;
299 #endif
300   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
301
302   log_timed_action (action, clock);
303 }
304
305 static void action_wait(const char *const *action){
306   double clock = smpi_process_simulated_elapsed();
307   MPI_Request request;
308   MPI_Status status;
309
310   xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
311       "action wait not preceded by any irecv or isend: %s",
312       xbt_str_join_array(action," "));
313   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
314   xbt_assert(request != NULL, "found null request in reqq");
315 #ifdef HAVE_TRACING
316   int rank = request->comm != MPI_COMM_NULL
317       ? smpi_comm_rank(request->comm)
318       : -1;
319
320   MPI_Group group = smpi_comm_group(request->comm);
321   int src_traced = smpi_group_rank(group, request->src);
322   int dst_traced = smpi_group_rank(group, request->dst);
323   int is_wait_for_receive = request->recv;
324   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325   extra->type = TRACING_WAIT;
326   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
327 #endif
328   smpi_mpi_wait(&request, &status);
329 #ifdef HAVE_TRACING
330   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
331   if (is_wait_for_receive) {
332     TRACE_smpi_recv(rank, src_traced, dst_traced);
333   }
334 #endif
335
336   log_timed_action (action, clock);
337 }
338
339 static void action_waitall(const char *const *action){
340   double clock = smpi_process_simulated_elapsed();
341   int count_requests=0;
342   unsigned int i=0;
343
344   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
345
346   if (count_requests>0) {
347     MPI_Request requests[count_requests];
348     MPI_Status status[count_requests];
349
350     /*  The reqq is an array of dynars. Its index corresponds to the rank.
351      Thus each rank saves its own requests to the array request. */
352     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
353
354   #ifdef HAVE_TRACING
355    //save information from requests
356
357    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
358    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
359    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
360    for (i = 0; i < count_requests; i++) {
361     if(requests[i]){
362       int *asrc = xbt_new(int, 1);
363       int *adst = xbt_new(int, 1);
364       int *arecv = xbt_new(int, 1);
365       *asrc = requests[i]->src;
366       *adst = requests[i]->dst;
367       *arecv = requests[i]->recv;
368       xbt_dynar_insert_at(srcs, i, asrc);
369       xbt_dynar_insert_at(dsts, i, adst);
370       xbt_dynar_insert_at(recvs, i, arecv);
371       xbt_free(asrc);
372       xbt_free(adst);
373       xbt_free(arecv);
374     }else {
375       int *t = xbt_new(int, 1);
376       xbt_dynar_insert_at(srcs, i, t);
377       xbt_dynar_insert_at(dsts, i, t);
378       xbt_dynar_insert_at(recvs, i, t);
379       xbt_free(t);
380     }
381    }
382    int rank_traced = smpi_process_index();
383    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
384    extra->type = TRACING_WAITALL;
385    extra->send_size=count_requests;
386    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
387  #endif
388
389     smpi_mpi_waitall(count_requests, requests, status);
390
391   #ifdef HAVE_TRACING
392    for (i = 0; i < count_requests; i++) {
393     int src_traced, dst_traced, is_wait_for_receive;
394     xbt_dynar_get_cpy(srcs, i, &src_traced);
395     xbt_dynar_get_cpy(dsts, i, &dst_traced);
396     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
397     if (is_wait_for_receive) {
398       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
399     }
400    }
401    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
402    //clean-up of dynars
403    xbt_dynar_free(&srcs);
404    xbt_dynar_free(&dsts);
405    xbt_dynar_free(&recvs);
406   #endif
407
408    xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
409   }
410   log_timed_action (action, clock);
411 }
412
413 static void action_barrier(const char *const *action){
414   double clock = smpi_process_simulated_elapsed();
415 #ifdef HAVE_TRACING
416   int rank = smpi_comm_rank(MPI_COMM_WORLD);
417   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418   extra->type = TRACING_BARRIER;
419   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
420 #endif
421   smpi_mpi_barrier(MPI_COMM_WORLD);
422 #ifdef HAVE_TRACING
423   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
424 #endif
425
426   log_timed_action (action, clock);
427 }
428
429
430 static void action_bcast(const char *const *action)
431 {
432   double size = parse_double(action[2]);
433   double clock = smpi_process_simulated_elapsed();
434   int root=0;
435   /*
436    * Initialize MPI_CURRENT_TYPE in order to decrease
437    * the number of the checks
438    * */
439   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
440
441   if(action[3]) {
442     root= atoi(action[3]);
443     if(action[4]) {
444       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
445     }
446   }
447
448 #ifdef HAVE_TRACING
449   int rank = smpi_comm_rank(MPI_COMM_WORLD);
450   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
451
452   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
453   extra->type = TRACING_BCAST;
454   extra->send_size = size;
455   extra->root = root_traced;
456   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
457   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
458
459 #endif
460
461   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
462 #ifdef HAVE_TRACING
463   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
464 #endif
465
466   log_timed_action (action, clock);
467 }
468
469 static void action_reduce(const char *const *action)
470 {
471   double comm_size = parse_double(action[2]);
472   double comp_size = parse_double(action[3]);
473   double clock = smpi_process_simulated_elapsed();
474   int root=0;
475   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
476
477   if(action[4]) {
478     root= atoi(action[4]);
479     if(action[5]) {
480       MPI_CURRENT_TYPE=decode_datatype(action[5]);
481     }
482   }
483
484 #ifdef HAVE_TRACING
485   int rank = smpi_comm_rank(MPI_COMM_WORLD);
486   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
487   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
488   extra->type = TRACING_REDUCE;
489   extra->send_size = comm_size;
490   extra->comp_size = comp_size;
491   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
492   extra->root = root_traced;
493
494   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
495 #endif
496    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
497    smpi_execute_flops(comp_size);
498 #ifdef HAVE_TRACING
499   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
500 #endif
501
502   log_timed_action (action, clock);
503 }
504
505 static void action_allReduce(const char *const *action) {
506   double comm_size = parse_double(action[2]);
507   double comp_size = parse_double(action[3]);
508
509   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
510   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
511
512   double clock = smpi_process_simulated_elapsed();
513 #ifdef HAVE_TRACING
514   int rank = smpi_comm_rank(MPI_COMM_WORLD);
515   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
516   extra->type = TRACING_ALLREDUCE;
517   extra->send_size = comm_size;
518   extra->comp_size = comp_size;
519   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
520
521   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
522 #endif
523   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
524   smpi_execute_flops(comp_size);
525   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
526 #ifdef HAVE_TRACING
527   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
528 #endif
529
530   log_timed_action (action, clock);
531 }
532
533 static void action_allToAll(const char *const *action) {
534   double clock = smpi_process_simulated_elapsed();
535   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
536   int send_size = parse_double(action[2]);
537   int recv_size = parse_double(action[3]);
538   MPI_Datatype MPI_CURRENT_TYPE2;
539
540   if(action[4]) {
541     MPI_CURRENT_TYPE=decode_datatype(action[4]);
542     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
543   }
544   else {
545     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
546     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
547   }
548   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
549   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
550
551 #ifdef HAVE_TRACING
552   int rank = smpi_process_index();
553   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
554   extra->type = TRACING_ALLTOALL;
555   extra->send_size = send_size;
556   extra->recv_size = recv_size;
557   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
558   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
559
560   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
561 #endif
562
563   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
564
565 #ifdef HAVE_TRACING
566   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
567 #endif
568
569   log_timed_action (action, clock);
570   xbt_free(send);
571   xbt_free(recv);
572 }
573
574
575 static void action_gather(const char *const *action) {
576   /*
577  The structure of the gather action for the rank 0 (total 4 processes) 
578  is the following:   
579  0 gather 68 68 0 0 0
580
581   where: 
582   1) 68 is the sendcounts
583   2) 68 is the recvcounts
584   3) 0 is the root node
585   4) 0 is the send datatype id, see decode_datatype()
586   5) 0 is the recv datatype id, see decode_datatype()
587   */
588   double clock = smpi_process_simulated_elapsed();
589   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
590   int send_size = parse_double(action[2]);
591   int recv_size = parse_double(action[3]);
592   MPI_Datatype MPI_CURRENT_TYPE2;
593   if(action[5]) {
594     MPI_CURRENT_TYPE=decode_datatype(action[5]);
595     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
596   } else {
597     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
598     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
599   }
600   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
601   void *recv = NULL;
602
603   int root=atoi(action[4]);
604   int rank = smpi_process_index();
605
606   if(rank==root)
607     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
608
609 #ifdef HAVE_TRACING
610   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
611   extra->type = TRACING_GATHER;
612   extra->send_size = send_size;
613   extra->recv_size = recv_size;
614   extra->root = root;
615   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
616   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
617
618   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
619 #endif
620 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
621                 recv, recv_size, MPI_CURRENT_TYPE2,
622                 root, MPI_COMM_WORLD);
623
624 #ifdef HAVE_TRACING
625   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
626 #endif
627
628   log_timed_action (action, clock);
629   xbt_free(send);
630   xbt_free(recv);
631 }
632
633
634
635 static void action_gatherv(const char *const *action) {
636   /*
637  The structure of the gatherv action for the rank 0 (total 4 processes)
638  is the following:
639  0 gather 68 68 10 10 10 0 0 0
640
641   where:
642   1) 68 is the sendcount
643   2) 68 10 10 10 is the recvcounts
644   3) 0 is the root node
645   4) 0 is the send datatype id, see decode_datatype()
646   5) 0 is the recv datatype id, see decode_datatype()
647   */
648   double clock = smpi_process_simulated_elapsed();
649   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
650   int send_size = parse_double(action[2]);
651   int *disps = xbt_new0(int, comm_size);
652   int *recvcounts = xbt_new0(int, comm_size);
653   int i=0,recv_sum=0;
654
655   MPI_Datatype MPI_CURRENT_TYPE2;
656   if(action[4+comm_size]) {
657     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
658     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
659   } else {
660     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
661     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
662   }
663   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
664   void *recv = NULL;
665   for(i=0;i<comm_size;i++) {
666     recvcounts[i] = atoi(action[i+3]);
667     recv_sum=recv_sum+recvcounts[i];
668     disps[i] = 0;
669   }
670
671   int root=atoi(action[3+comm_size]);
672   int rank = smpi_process_index();
673
674   if(rank==root)
675     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
676
677 #ifdef HAVE_TRACING
678   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
679   extra->type = TRACING_GATHERV;
680   extra->send_size = send_size;
681   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
682   for(i=0; i< comm_size; i++)//copy data to avoid bad free
683     extra->recvcounts[i] = recvcounts[i];
684   extra->root = root;
685   extra->num_processes = comm_size;
686   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
687   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
688
689   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
690 #endif
691 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
692                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
693                 root, MPI_COMM_WORLD);
694
695 #ifdef HAVE_TRACING
696   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
697 #endif
698
699   log_timed_action (action, clock);
700   xbt_free(recvcounts);
701   xbt_free(send);
702   xbt_free(recv);
703   xbt_free(disps);
704
705 }
706
707 static void action_reducescatter(const char *const *action) {
708
709     /*
710  The structure of the reducescatter action for the rank 0 (total 4 processes) 
711  is the following:   
712 0 reduceScatter 275427 275427 275427 204020 11346849 0
713
714   where: 
715   1) The first four values after the name of the action declare the recvcounts array
716   2) The value 11346849 is the amount of instructions
717   3) The last value corresponds to the datatype, see decode_datatype().
718
719   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
720
721    */
722
723   double clock = smpi_process_simulated_elapsed();
724   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
725   int comp_size = parse_double(action[2+comm_size]);
726   int *recvcounts = xbt_new0(int, comm_size);  
727   int *disps = xbt_new0(int, comm_size);  
728   int i=0,recv_sum=0;
729   int root=0;
730   int rank = smpi_process_index();
731
732   if(action[3+comm_size])
733     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
734   else
735     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
736
737   for(i=0;i<comm_size;i++) {
738     recvcounts[i] = atoi(action[i+2]);
739     recv_sum=recv_sum+recvcounts[i];
740     disps[i] = 0;
741   }
742
743 #ifdef HAVE_TRACING
744   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
745   extra->type = TRACING_REDUCE_SCATTER;
746   extra->send_size = 0;
747   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
748   for(i=0; i< comm_size; i++)//copy data to avoid bad free
749     extra->recvcounts[i] = recvcounts[i];
750   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
751   extra->comp_size = comp_size;
752   extra->num_processes = comm_size;
753
754
755   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
756 #endif
757    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
758        root, MPI_COMM_WORLD);
759    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
760                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
761    smpi_execute_flops(comp_size);
762
763
764 #ifdef HAVE_TRACING
765   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
766 #endif
767   xbt_free(recvcounts);
768   xbt_free(disps);
769   log_timed_action (action, clock);
770 }
771
772
773 static void action_allgatherv(const char *const *action) {
774
775   /*
776  The structure of the allgatherv action for the rank 0 (total 4 processes) 
777  is the following:   
778 0 allGatherV 275427 275427 275427 275427 204020
779
780   where: 
781   1) 275427 is the sendcount
782   2) The next four elements declare the recvcounts array
783   3) No more values mean that the datatype for sent and receive buffer
784   is the default one, see decode_datatype().
785
786    */
787
788   double clock = smpi_process_simulated_elapsed();
789
790   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
791   int i=0;
792   int sendcount=atoi(action[2]);
793   int *recvcounts = xbt_new0(int, comm_size);  
794   int *disps = xbt_new0(int, comm_size);  
795   int recv_sum=0;  
796   MPI_Datatype MPI_CURRENT_TYPE2;
797
798   if(action[3+comm_size]) {
799     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
801   } else {
802     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
803     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
804   }
805   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
806
807   for(i=0;i<comm_size;i++) {
808     recvcounts[i] = atoi(action[i+3]);
809     recv_sum=recv_sum+recvcounts[i];
810   }
811   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
812
813 #ifdef HAVE_TRACING
814   int rank = smpi_process_index();
815   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816   extra->type = TRACING_ALLGATHERV;
817   extra->send_size = sendcount;
818   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
819   for(i=0; i< comm_size; i++)//copy data to avoid bad free
820     extra->recvcounts[i] = recvcounts[i];
821   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
822   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
823   extra->num_processes = comm_size;
824
825   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
826 #endif
827
828 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
829
830 #ifdef HAVE_TRACING
831   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
832 #endif
833
834   log_timed_action (action, clock);
835   xbt_free(sendbuf);
836   xbt_free(recvbuf);
837   xbt_free(recvcounts);
838   xbt_free(disps);
839 }
840
841
842 static void action_allToAllv(const char *const *action) {
843   /*
844  The structure of the allToAllV action for the rank 0 (total 4 processes) 
845  is the following:   
846   0 allToAllV 100 1 7 10 12 100 1 70 10 5
847
848   where: 
849   1) 100 is the size of the send buffer *sizeof(int),
850   2) 1 7 10 12 is the sendcounts array
851   3) 100*sizeof(int) is the size of the receiver buffer
852   4)  1 70 10 5 is the recvcounts array
853
854    */
855
856
857   double clock = smpi_process_simulated_elapsed();
858
859   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
860   int send_buf_size=0,recv_buf_size=0,i=0;
861   int *sendcounts = xbt_new0(int, comm_size);  
862   int *recvcounts = xbt_new0(int, comm_size);  
863   int *senddisps = xbt_new0(int, comm_size);  
864   int *recvdisps = xbt_new0(int, comm_size);  
865
866   MPI_Datatype MPI_CURRENT_TYPE2;
867
868   send_buf_size=parse_double(action[2]);
869   recv_buf_size=parse_double(action[3+comm_size]);
870   if(action[4+2*comm_size]) {
871     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
872     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
873   }
874   else {
875       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
876       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
877   }
878
879   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
880   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
881
882   for(i=0;i<comm_size;i++) {
883     sendcounts[i] = atoi(action[i+3]);
884     recvcounts[i] = atoi(action[i+4+comm_size]);
885   }
886
887
888 #ifdef HAVE_TRACING
889   int rank = smpi_process_index();
890   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
891   extra->type = TRACING_ALLTOALLV;
892   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
893   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
894   extra->num_processes = comm_size;
895
896   for(i=0; i< comm_size; i++){//copy data to avoid bad free
897     extra->send_size += sendcounts[i];
898     extra->sendcounts[i] = sendcounts[i];
899     extra->recv_size += recvcounts[i];
900     extra->recvcounts[i] = recvcounts[i];
901   }
902   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
903   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
904
905   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
906 #endif
907     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
908                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
909                                MPI_COMM_WORLD);
910 #ifdef HAVE_TRACING
911   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
912 #endif
913
914   log_timed_action (action, clock);
915   xbt_free(sendbuf);
916   xbt_free(recvbuf);
917   xbt_free(sendcounts);
918   xbt_free(recvcounts);
919   xbt_free(senddisps);
920   xbt_free(recvdisps);
921 }
922
923 void smpi_replay_init(int *argc, char***argv){
924   smpi_process_init(argc, argv);
925   smpi_process_mark_as_initialized();
926 #ifdef HAVE_TRACING
927   int rank = smpi_process_index();
928   TRACE_smpi_init(rank);
929   TRACE_smpi_computing_init(rank);
930   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
931   extra->type = TRACING_INIT;
932   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
933   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
934 #endif
935
936   if (!smpi_process_index()){
937     _xbt_replay_action_init();
938     xbt_replay_action_register("init",       action_init);
939     xbt_replay_action_register("finalize",   action_finalize);
940     xbt_replay_action_register("comm_size",  action_comm_size);
941     xbt_replay_action_register("comm_split", action_comm_split);
942     xbt_replay_action_register("comm_dup",   action_comm_dup);
943     xbt_replay_action_register("send",       action_send);
944     xbt_replay_action_register("Isend",      action_Isend);
945     xbt_replay_action_register("recv",       action_recv);
946     xbt_replay_action_register("Irecv",      action_Irecv);
947     xbt_replay_action_register("wait",       action_wait);
948     xbt_replay_action_register("waitAll",    action_waitall);
949     xbt_replay_action_register("barrier",    action_barrier);
950     xbt_replay_action_register("bcast",      action_bcast);
951     xbt_replay_action_register("reduce",     action_reduce);
952     xbt_replay_action_register("allReduce",  action_allReduce);
953     xbt_replay_action_register("allToAll",   action_allToAll);
954     xbt_replay_action_register("allToAllV",  action_allToAllv);
955     xbt_replay_action_register("gather",  action_gather);
956     xbt_replay_action_register("gatherV",  action_gatherv);
957     xbt_replay_action_register("allGatherV",  action_allgatherv);
958     xbt_replay_action_register("reduceScatter",  action_reducescatter);
959     xbt_replay_action_register("compute",    action_compute);
960   }
961
962   xbt_replay_action_runner(*argc, *argv);
963 }
964
965 int smpi_replay_finalize(){
966   double sim_time= 1.;
967   /* One active process will stop. Decrease the counter*/
968   XBT_DEBUG("There are %lu elements in reqq[*]",
969             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
970   if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
971     int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
972     MPI_Request requests[count_requests];
973     MPI_Status status[count_requests];
974     unsigned int i;
975
976     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
977     smpi_mpi_waitall(count_requests, requests, status);
978     active_processes--;
979   } else {
980     active_processes--;
981   }
982
983   xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
984
985   if(!active_processes){
986     /* Last process alive speaking */
987     /* end the simulated timer */
988     sim_time = smpi_process_simulated_elapsed();
989     XBT_INFO("Simulation time %g", sim_time);
990     _xbt_replay_action_exit();
991     xbt_free(reqq);
992     reqq = NULL;
993   }
994   smpi_mpi_barrier(MPI_COMM_WORLD);
995 #ifdef HAVE_TRACING
996   int rank = smpi_process_index();
997   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
998   extra->type = TRACING_FINALIZE;
999   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1000 #endif
1001   smpi_process_finalize();
1002 #ifdef HAVE_TRACING
1003   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1004   TRACE_smpi_finalize(smpi_process_index());
1005 #endif
1006   smpi_process_destroy();
1007   return MPI_SUCCESS;
1008 }