Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
root should be optional for gather replay (as it is everywhere else)
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   //if request is null here, this may mean that a previous test has succeeded 
355   //Different times in traced application and replayed version may lead to this 
356   //In this case, ignore the extra calls.
357   if(request){
358 #ifdef HAVE_TRACING
359   int rank = smpi_process_index();
360   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361   extra->type=TRACING_TEST;
362   TRACE_smpi_testing_in(rank, extra);
363 #endif
364
365     flag = smpi_mpi_test(&request, &status);
366
367   XBT_DEBUG("MPI_Test result: %d", flag);
368   /* push back request in dynar to be caught by a subsequent wait. if the test
369    * did succeed, the request is now NULL.
370    */
371   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
372
373 #ifdef HAVE_TRACING
374   TRACE_smpi_testing_out(rank);
375 #endif
376   }
377   log_timed_action (action, clock);
378 }
379
380 static void action_wait(const char *const *action){
381   double clock = smpi_process_simulated_elapsed();
382   MPI_Request request;
383   MPI_Status status;
384
385   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386       "action wait not preceded by any irecv or isend: %s",
387       xbt_str_join_array(action," "));
388   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
389
390   if (!request){
391     /* Assuming that the trace is well formed, this mean the comm might have
392      * been caught by a MPI_test. Then just return.
393      */
394     return;
395   }
396
397 #ifdef HAVE_TRACING
398   int rank = request->comm != MPI_COMM_NULL
399       ? smpi_comm_rank(request->comm)
400       : -1;
401
402   MPI_Group group = smpi_comm_group(request->comm);
403   int src_traced = smpi_group_rank(group, request->src);
404   int dst_traced = smpi_group_rank(group, request->dst);
405   int is_wait_for_receive = request->recv;
406   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407   extra->type = TRACING_WAIT;
408   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
409 #endif
410   smpi_mpi_wait(&request, &status);
411 #ifdef HAVE_TRACING
412   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413   if (is_wait_for_receive) {
414     TRACE_smpi_recv(rank, src_traced, dst_traced);
415   }
416 #endif
417
418   log_timed_action (action, clock);
419 }
420
421 static void action_waitall(const char *const *action){
422   double clock = smpi_process_simulated_elapsed();
423   int count_requests=0;
424   unsigned int i=0;
425
426   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
427
428   if (count_requests>0) {
429     MPI_Request requests[count_requests];
430     MPI_Status status[count_requests];
431
432     /*  The reqq is an array of dynars. Its index corresponds to the rank.
433      Thus each rank saves its own requests to the array request. */
434     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
435
436   #ifdef HAVE_TRACING
437    //save information from requests
438
439    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442    for (i = 0; i < count_requests; i++) {
443     if(requests[i]){
444       int *asrc = xbt_new(int, 1);
445       int *adst = xbt_new(int, 1);
446       int *arecv = xbt_new(int, 1);
447       *asrc = requests[i]->src;
448       *adst = requests[i]->dst;
449       *arecv = requests[i]->recv;
450       xbt_dynar_insert_at(srcs, i, asrc);
451       xbt_dynar_insert_at(dsts, i, adst);
452       xbt_dynar_insert_at(recvs, i, arecv);
453       xbt_free(asrc);
454       xbt_free(adst);
455       xbt_free(arecv);
456     }else {
457       int *t = xbt_new(int, 1);
458       xbt_dynar_insert_at(srcs, i, t);
459       xbt_dynar_insert_at(dsts, i, t);
460       xbt_dynar_insert_at(recvs, i, t);
461       xbt_free(t);
462     }
463    }
464    int rank_traced = smpi_process_index();
465    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466    extra->type = TRACING_WAITALL;
467    extra->send_size=count_requests;
468    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
469  #endif
470
471     smpi_mpi_waitall(count_requests, requests, status);
472
473   #ifdef HAVE_TRACING
474    for (i = 0; i < count_requests; i++) {
475     int src_traced, dst_traced, is_wait_for_receive;
476     xbt_dynar_get_cpy(srcs, i, &src_traced);
477     xbt_dynar_get_cpy(dsts, i, &dst_traced);
478     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479     if (is_wait_for_receive) {
480       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
481     }
482    }
483    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
484    //clean-up of dynars
485    xbt_dynar_free(&srcs);
486    xbt_dynar_free(&dsts);
487    xbt_dynar_free(&recvs);
488   #endif
489
490    int freedrank=smpi_process_index();
491    xbt_dynar_free_container(&(reqq[freedrank]));
492    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
493   }
494   log_timed_action (action, clock);
495 }
496
497 static void action_barrier(const char *const *action){
498   double clock = smpi_process_simulated_elapsed();
499 #ifdef HAVE_TRACING
500   int rank = smpi_process_index();
501   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502   extra->type = TRACING_BARRIER;
503   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
504 #endif
505   mpi_coll_barrier_fun(MPI_COMM_WORLD);
506 #ifdef HAVE_TRACING
507   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
508 #endif
509
510   log_timed_action (action, clock);
511 }
512
513
514 static void action_bcast(const char *const *action)
515 {
516   double size = parse_double(action[2]);
517   double clock = smpi_process_simulated_elapsed();
518   int root=0;
519   /*
520    * Initialize MPI_CURRENT_TYPE in order to decrease
521    * the number of the checks
522    * */
523   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
524
525   if(action[3]) {
526     root= atoi(action[3]);
527     if(action[4]) {
528       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
529     }
530   }
531
532 #ifdef HAVE_TRACING
533   int rank = smpi_process_index();
534   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
535
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_BCAST;
538   extra->send_size = size;
539   extra->root = root_traced;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
542
543 #endif
544     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
546 #ifdef HAVE_TRACING
547   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
548 #endif
549   log_timed_action (action, clock);
550 }
551
552 static void action_reduce(const char *const *action)
553 {
554   double comm_size = parse_double(action[2]);
555   double comp_size = parse_double(action[3]);
556   double clock = smpi_process_simulated_elapsed();
557   int root=0;
558   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
559
560   if(action[4]) {
561     root= atoi(action[4]);
562     if(action[5]) {
563       MPI_CURRENT_TYPE=decode_datatype(action[5]);
564     }
565   }
566   
567   
568
569 #ifdef HAVE_TRACING
570   int rank = smpi_process_index();
571   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_REDUCE;
574   extra->send_size = comm_size;
575   extra->comp_size = comp_size;
576   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577   extra->root = root_traced;
578
579   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
580 #endif
581     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584    smpi_execute_flops(comp_size);
585 #ifdef HAVE_TRACING
586   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
587 #endif
588   log_timed_action (action, clock);
589 }
590
591 static void action_allReduce(const char *const *action) {
592   double comm_size = parse_double(action[2]);
593   double comp_size = parse_double(action[3]);
594
595   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
597
598   double clock = smpi_process_simulated_elapsed();
599 #ifdef HAVE_TRACING
600   int rank = smpi_process_index();
601   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602   extra->type = TRACING_ALLREDUCE;
603   extra->send_size = comm_size;
604   extra->comp_size = comp_size;
605   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
606
607   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
608 #endif
609     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612   smpi_execute_flops(comp_size);
613 #ifdef HAVE_TRACING
614   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
615 #endif
616   log_timed_action (action, clock);
617 }
618
619 static void action_allToAll(const char *const *action) {
620   double clock = smpi_process_simulated_elapsed();
621   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622   int send_size = parse_double(action[2]);
623   int recv_size = parse_double(action[3]);
624   MPI_Datatype MPI_CURRENT_TYPE2;
625
626   if(action[4]) {
627     MPI_CURRENT_TYPE=decode_datatype(action[4]);
628     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
629   }
630   else {
631     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
633   }
634   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
636
637 #ifdef HAVE_TRACING
638   int rank = smpi_process_index();
639   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640   extra->type = TRACING_ALLTOALL;
641   extra->send_size = send_size;
642   extra->recv_size = recv_size;
643   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
645
646   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
647 #endif
648
649   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
650
651 #ifdef HAVE_TRACING
652   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
653 #endif
654   log_timed_action (action, clock);
655
656 }
657
658
659 static void action_gather(const char *const *action) {
660   /*
661  The structure of the gather action for the rank 0 (total 4 processes) 
662  is the following:   
663  0 gather 68 68 0 0 0
664
665   where: 
666   1) 68 is the sendcounts
667   2) 68 is the recvcounts
668   3) 0 is the root node
669   4) 0 is the send datatype id, see decode_datatype()
670   5) 0 is the recv datatype id, see decode_datatype()
671   */
672   double clock = smpi_process_simulated_elapsed();
673   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674   int send_size = parse_double(action[2]);
675   int recv_size = parse_double(action[3]);
676   MPI_Datatype MPI_CURRENT_TYPE2;
677   if(action[5]) {
678     MPI_CURRENT_TYPE=decode_datatype(action[5]);
679     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
680   } else {
681     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
683   }
684   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
685   void *recv = NULL;
686   int root=0;
687   if(action[4])
688     root=atoi(action[4]);
689   int rank = smpi_comm_rank(MPI_COMM_WORLD);
690
691   if(rank==root)
692     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
693
694 #ifdef HAVE_TRACING
695   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
696   extra->type = TRACING_GATHER;
697   extra->send_size = send_size;
698   extra->recv_size = recv_size;
699   extra->root = root;
700   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
701   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
702
703   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
704 #endif
705   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
706                 recv, recv_size, MPI_CURRENT_TYPE2,
707                 root, MPI_COMM_WORLD);
708
709 #ifdef HAVE_TRACING
710   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
711 #endif
712   log_timed_action (action, clock);
713
714 }
715
716
717
718 static void action_gatherv(const char *const *action) {
719   /*
720  The structure of the gatherv action for the rank 0 (total 4 processes)
721  is the following:
722  0 gather 68 68 10 10 10 0 0 0
723
724   where:
725   1) 68 is the sendcount
726   2) 68 10 10 10 is the recvcounts
727   3) 0 is the root node
728   4) 0 is the send datatype id, see decode_datatype()
729   5) 0 is the recv datatype id, see decode_datatype()
730   */
731   double clock = smpi_process_simulated_elapsed();
732   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
733   int send_size = parse_double(action[2]);
734   int *disps = xbt_new0(int, comm_size);
735   int *recvcounts = xbt_new0(int, comm_size);
736   int i=0,recv_sum=0;
737
738   MPI_Datatype MPI_CURRENT_TYPE2;
739   if(action[4+comm_size]) {
740     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
741     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
742   } else {
743     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
744     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
745   }
746   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
747   void *recv = NULL;
748   for(i=0;i<comm_size;i++) {
749     recvcounts[i] = atoi(action[i+3]);
750     recv_sum=recv_sum+recvcounts[i];
751     disps[i] = 0;
752   }
753
754   int root=atoi(action[3+comm_size]);
755   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
756
757   if(rank==root)
758     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
759
760 #ifdef HAVE_TRACING
761   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
762   extra->type = TRACING_GATHERV;
763   extra->send_size = send_size;
764   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
765   for(i=0; i< comm_size; i++)//copy data to avoid bad free
766     extra->recvcounts[i] = recvcounts[i];
767   extra->root = root;
768   extra->num_processes = comm_size;
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
771
772   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
773 #endif
774 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
775                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
776                 root, MPI_COMM_WORLD);
777
778 #ifdef HAVE_TRACING
779   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
780 #endif
781
782   log_timed_action (action, clock);
783   xbt_free(recvcounts);
784   xbt_free(disps);
785 }
786
787 static void action_reducescatter(const char *const *action) {
788
789     /*
790  The structure of the reducescatter action for the rank 0 (total 4 processes) 
791  is the following:   
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
793
794   where: 
795   1) The first four values after the name of the action declare the recvcounts array
796   2) The value 11346849 is the amount of instructions
797   3) The last value corresponds to the datatype, see decode_datatype().
798
799   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
800
801    */
802
803   double clock = smpi_process_simulated_elapsed();
804   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805   int comp_size = parse_double(action[2+comm_size]);
806   int *recvcounts = xbt_new0(int, comm_size);  
807   int *disps = xbt_new0(int, comm_size);  
808   int i=0;
809   int rank = smpi_process_index();
810   int size = 0;
811   if(action[3+comm_size])
812     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
813   else
814     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
815
816   for(i=0;i<comm_size;i++) {
817     recvcounts[i] = atoi(action[i+2]);
818     disps[i] = 0;
819     size+=recvcounts[i];
820   }
821
822 #ifdef HAVE_TRACING
823   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824   extra->type = TRACING_REDUCE_SCATTER;
825   extra->send_size = 0;
826   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827   for(i=0; i< comm_size; i++)//copy data to avoid bad free
828     extra->recvcounts[i] = recvcounts[i];
829   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
830   extra->comp_size = comp_size;
831   extra->num_processes = comm_size;
832
833   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834 #endif
835   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
837    
838    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
839        MPI_COMM_WORLD);
840    smpi_execute_flops(comp_size);
841
842
843 #ifdef HAVE_TRACING
844   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
845 #endif
846   xbt_free(recvcounts);
847   xbt_free(disps);
848   log_timed_action (action, clock);
849 }
850
851
852 static void action_allgatherv(const char *const *action) {
853
854   /*
855  The structure of the allgatherv action for the rank 0 (total 4 processes) 
856  is the following:   
857 0 allGatherV 275427 275427 275427 275427 204020
858
859   where: 
860   1) 275427 is the sendcount
861   2) The next four elements declare the recvcounts array
862   3) No more values mean that the datatype for sent and receive buffer
863   is the default one, see decode_datatype().
864
865    */
866
867   double clock = smpi_process_simulated_elapsed();
868
869   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
870   int i=0;
871   int sendcount=atoi(action[2]);
872   int *recvcounts = xbt_new0(int, comm_size);  
873   int *disps = xbt_new0(int, comm_size);  
874   int recv_sum=0;  
875   MPI_Datatype MPI_CURRENT_TYPE2;
876
877   if(action[3+comm_size]) {
878     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
879     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
880   } else {
881     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
882     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
883   }
884   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
885
886   for(i=0;i<comm_size;i++) {
887     recvcounts[i] = atoi(action[i+3]);
888     recv_sum=recv_sum+recvcounts[i];
889   }
890   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
891
892 #ifdef HAVE_TRACING
893   int rank = smpi_process_index();
894   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
895   extra->type = TRACING_ALLGATHERV;
896   extra->send_size = sendcount;
897   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
898   for(i=0; i< comm_size; i++)//copy data to avoid bad free
899     extra->recvcounts[i] = recvcounts[i];
900   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
901   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
902   extra->num_processes = comm_size;
903
904   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
905 #endif
906
907   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
908
909 #ifdef HAVE_TRACING
910   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
911 #endif
912
913   log_timed_action (action, clock);
914   xbt_free(recvcounts);
915   xbt_free(disps);
916 }
917
918
919 static void action_allToAllv(const char *const *action) {
920   /*
921  The structure of the allToAllV action for the rank 0 (total 4 processes) 
922  is the following:   
923   0 allToAllV 100 1 7 10 12 100 1 70 10 5
924
925   where: 
926   1) 100 is the size of the send buffer *sizeof(int),
927   2) 1 7 10 12 is the sendcounts array
928   3) 100*sizeof(int) is the size of the receiver buffer
929   4)  1 70 10 5 is the recvcounts array
930
931    */
932
933
934   double clock = smpi_process_simulated_elapsed();
935
936   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
937   int send_buf_size=0,recv_buf_size=0,i=0;
938   int *sendcounts = xbt_new0(int, comm_size);  
939   int *recvcounts = xbt_new0(int, comm_size);  
940   int *senddisps = xbt_new0(int, comm_size);  
941   int *recvdisps = xbt_new0(int, comm_size);  
942
943   MPI_Datatype MPI_CURRENT_TYPE2;
944
945   send_buf_size=parse_double(action[2]);
946   recv_buf_size=parse_double(action[3+comm_size]);
947   if(action[4+2*comm_size]) {
948     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
949     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
950   }
951   else {
952       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
953       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
954   }
955
956   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
957   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
958
959   for(i=0;i<comm_size;i++) {
960     sendcounts[i] = atoi(action[i+3]);
961     recvcounts[i] = atoi(action[i+4+comm_size]);
962   }
963
964
965 #ifdef HAVE_TRACING
966   int rank = smpi_process_index();
967   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
968   extra->type = TRACING_ALLTOALLV;
969   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
970   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
971   extra->num_processes = comm_size;
972
973   for(i=0; i< comm_size; i++){//copy data to avoid bad free
974     extra->send_size += sendcounts[i];
975     extra->sendcounts[i] = sendcounts[i];
976     extra->recv_size += recvcounts[i];
977     extra->recvcounts[i] = recvcounts[i];
978   }
979   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
980   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
981
982   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
983 #endif
984   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
985                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
986                                MPI_COMM_WORLD);
987 #ifdef HAVE_TRACING
988   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
989 #endif
990
991   log_timed_action (action, clock);
992   xbt_free(sendcounts);
993   xbt_free(recvcounts);
994   xbt_free(senddisps);
995   xbt_free(recvdisps);
996 }
997
998 void smpi_replay_init(int *argc, char***argv){
999   smpi_process_init(argc, argv);
1000   smpi_process_mark_as_initialized();
1001   smpi_process_set_replaying(1);
1002 #ifdef HAVE_TRACING
1003   int rank = smpi_process_index();
1004   TRACE_smpi_init(rank);
1005   TRACE_smpi_computing_init(rank);
1006   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1007   extra->type = TRACING_INIT;
1008   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1009   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1010 #endif
1011
1012   if (!smpi_process_index()){
1013     _xbt_replay_action_init();
1014     xbt_replay_action_register("init",       action_init);
1015     xbt_replay_action_register("finalize",   action_finalize);
1016     xbt_replay_action_register("comm_size",  action_comm_size);
1017     xbt_replay_action_register("comm_split", action_comm_split);
1018     xbt_replay_action_register("comm_dup",   action_comm_dup);
1019     xbt_replay_action_register("send",       action_send);
1020     xbt_replay_action_register("Isend",      action_Isend);
1021     xbt_replay_action_register("recv",       action_recv);
1022     xbt_replay_action_register("Irecv",      action_Irecv);
1023     xbt_replay_action_register("test",       action_test);
1024     xbt_replay_action_register("wait",       action_wait);
1025     xbt_replay_action_register("waitAll",    action_waitall);
1026     xbt_replay_action_register("barrier",    action_barrier);
1027     xbt_replay_action_register("bcast",      action_bcast);
1028     xbt_replay_action_register("reduce",     action_reduce);
1029     xbt_replay_action_register("allReduce",  action_allReduce);
1030     xbt_replay_action_register("allToAll",   action_allToAll);
1031     xbt_replay_action_register("allToAllV",  action_allToAllv);
1032     xbt_replay_action_register("gather",  action_gather);
1033     xbt_replay_action_register("gatherV",  action_gatherv);
1034     xbt_replay_action_register("allGatherV",  action_allgatherv);
1035     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1036     xbt_replay_action_register("compute",    action_compute);
1037   }
1038   
1039   //if we have a delayed start, sleep here.
1040   if(*argc>2){
1041     char *endptr;
1042     double value = strtod((*argv)[2], &endptr);
1043     if (*endptr != '\0')
1044       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1045     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1046     smpi_execute_flops(value);
1047   }
1048   xbt_replay_action_runner(*argc, *argv);
1049 }
1050
1051 int smpi_replay_finalize(){
1052   double sim_time= 1.;
1053   /* One active process will stop. Decrease the counter*/
1054   XBT_DEBUG("There are %lu elements in reqq[*]",
1055             xbt_dynar_length(reqq[smpi_process_index()]));
1056   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1057     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1058     MPI_Request requests[count_requests];
1059     MPI_Status status[count_requests];
1060     unsigned int i;
1061
1062     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1063     smpi_mpi_waitall(count_requests, requests, status);
1064     active_processes--;
1065   } else {
1066     active_processes--;
1067   }
1068
1069   if(!active_processes){
1070     /* Last process alive speaking */
1071     /* end the simulated timer */
1072     sim_time = smpi_process_simulated_elapsed();
1073   }
1074   
1075
1076   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1077
1078   if(!active_processes){
1079     XBT_INFO("Simulation time %f", sim_time);
1080     _xbt_replay_action_exit();
1081     xbt_free(sendbuffer);
1082     xbt_free(recvbuffer);
1083     xbt_free(reqq);
1084     reqq = NULL;
1085   }
1086   
1087
1088 #ifdef HAVE_TRACING
1089   int rank = smpi_process_index();
1090   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1091   extra->type = TRACING_FINALIZE;
1092   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1093 #endif
1094   smpi_process_finalize();
1095 #ifdef HAVE_TRACING
1096   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1097   TRACE_smpi_finalize(smpi_process_index());
1098 #endif
1099   smpi_process_destroy();
1100   return MPI_SUCCESS;
1101 }