Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
0dc2d78f3cea448635e358fdd2d9e4d12b079646
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   //if request is null here, this may mean that a previous test has succeeded 
355   //Different times in traced application and replayed version may lead to this 
356   //In this case, ignore the extra calls.
357   if(request){
358 #ifdef HAVE_TRACING
359   int rank = smpi_process_index();
360   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361   extra->type=TRACING_TEST;
362   TRACE_smpi_testing_in(rank, extra);
363 #endif
364
365     flag = smpi_mpi_test(&request, &status);
366
367   XBT_DEBUG("MPI_Test result: %d", flag);
368   /* push back request in dynar to be caught by a subsequent wait. if the test
369    * did succeed, the request is now NULL.
370    */
371   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
372
373 #ifdef HAVE_TRACING
374   TRACE_smpi_testing_out(rank);
375 #endif
376   }
377   log_timed_action (action, clock);
378 }
379
380 static void action_wait(const char *const *action){
381   double clock = smpi_process_simulated_elapsed();
382   MPI_Request request;
383   MPI_Status status;
384
385   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386       "action wait not preceded by any irecv or isend: %s",
387       xbt_str_join_array(action," "));
388   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
389
390   if (!request){
391     /* Assuming that the trace is well formed, this mean the comm might have
392      * been caught by a MPI_test. Then just return.
393      */
394     return;
395   }
396
397 #ifdef HAVE_TRACING
398   int rank = request->comm != MPI_COMM_NULL
399       ? smpi_comm_rank(request->comm)
400       : -1;
401
402   MPI_Group group = smpi_comm_group(request->comm);
403   int src_traced = smpi_group_rank(group, request->src);
404   int dst_traced = smpi_group_rank(group, request->dst);
405   int is_wait_for_receive = request->recv;
406   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407   extra->type = TRACING_WAIT;
408   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
409 #endif
410   smpi_mpi_wait(&request, &status);
411 #ifdef HAVE_TRACING
412   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413   if (is_wait_for_receive) {
414     TRACE_smpi_recv(rank, src_traced, dst_traced);
415   }
416 #endif
417
418   log_timed_action (action, clock);
419 }
420
421 static void action_waitall(const char *const *action){
422   double clock = smpi_process_simulated_elapsed();
423   int count_requests=0;
424   unsigned int i=0;
425
426   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
427
428   if (count_requests>0) {
429     MPI_Request requests[count_requests];
430     MPI_Status status[count_requests];
431
432     /*  The reqq is an array of dynars. Its index corresponds to the rank.
433      Thus each rank saves its own requests to the array request. */
434     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
435
436   #ifdef HAVE_TRACING
437    //save information from requests
438
439    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442    for (i = 0; i < count_requests; i++) {
443     if(requests[i]){
444       int *asrc = xbt_new(int, 1);
445       int *adst = xbt_new(int, 1);
446       int *arecv = xbt_new(int, 1);
447       *asrc = requests[i]->src;
448       *adst = requests[i]->dst;
449       *arecv = requests[i]->recv;
450       xbt_dynar_insert_at(srcs, i, asrc);
451       xbt_dynar_insert_at(dsts, i, adst);
452       xbt_dynar_insert_at(recvs, i, arecv);
453       xbt_free(asrc);
454       xbt_free(adst);
455       xbt_free(arecv);
456     }else {
457       int *t = xbt_new(int, 1);
458       xbt_dynar_insert_at(srcs, i, t);
459       xbt_dynar_insert_at(dsts, i, t);
460       xbt_dynar_insert_at(recvs, i, t);
461       xbt_free(t);
462     }
463    }
464    int rank_traced = smpi_process_index();
465    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466    extra->type = TRACING_WAITALL;
467    extra->send_size=count_requests;
468    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
469  #endif
470
471     smpi_mpi_waitall(count_requests, requests, status);
472
473   #ifdef HAVE_TRACING
474    for (i = 0; i < count_requests; i++) {
475     int src_traced, dst_traced, is_wait_for_receive;
476     xbt_dynar_get_cpy(srcs, i, &src_traced);
477     xbt_dynar_get_cpy(dsts, i, &dst_traced);
478     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479     if (is_wait_for_receive) {
480       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
481     }
482    }
483    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
484    //clean-up of dynars
485    xbt_dynar_free(&srcs);
486    xbt_dynar_free(&dsts);
487    xbt_dynar_free(&recvs);
488   #endif
489
490    int freedrank=smpi_process_index();
491    xbt_dynar_free_container(&(reqq[freedrank]));
492    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
493   }
494   log_timed_action (action, clock);
495 }
496
497 static void action_barrier(const char *const *action){
498   double clock = smpi_process_simulated_elapsed();
499 #ifdef HAVE_TRACING
500   int rank = smpi_process_index();
501   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502   extra->type = TRACING_BARRIER;
503   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
504 #endif
505   mpi_coll_barrier_fun(MPI_COMM_WORLD);
506 #ifdef HAVE_TRACING
507   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
508 #endif
509
510   log_timed_action (action, clock);
511 }
512
513
514 static void action_bcast(const char *const *action)
515 {
516   double size = parse_double(action[2]);
517   double clock = smpi_process_simulated_elapsed();
518   int root=0;
519   /*
520    * Initialize MPI_CURRENT_TYPE in order to decrease
521    * the number of the checks
522    * */
523   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
524
525   if(action[3]) {
526     root= atoi(action[3]);
527     if(action[4]) {
528       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
529     }
530   }
531
532 #ifdef HAVE_TRACING
533   int rank = smpi_process_index();
534   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
535
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_BCAST;
538   extra->send_size = size;
539   extra->root = root_traced;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
542
543 #endif
544     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
546 #ifdef HAVE_TRACING
547   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
548 #endif
549   log_timed_action (action, clock);
550 }
551
552 static void action_reduce(const char *const *action)
553 {
554   double comm_size = parse_double(action[2]);
555   double comp_size = parse_double(action[3]);
556   double clock = smpi_process_simulated_elapsed();
557   int root=0;
558   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
559
560   if(action[4]) {
561     root= atoi(action[4]);
562     if(action[5]) {
563       MPI_CURRENT_TYPE=decode_datatype(action[5]);
564     }
565   }
566   
567   
568
569 #ifdef HAVE_TRACING
570   int rank = smpi_process_index();
571   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_REDUCE;
574   extra->send_size = comm_size;
575   extra->comp_size = comp_size;
576   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577   extra->root = root_traced;
578
579   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
580 #endif
581     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584    smpi_execute_flops(comp_size);
585 #ifdef HAVE_TRACING
586   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
587 #endif
588   log_timed_action (action, clock);
589 }
590
591 static void action_allReduce(const char *const *action) {
592   double comm_size = parse_double(action[2]);
593   double comp_size = parse_double(action[3]);
594
595   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
597
598   double clock = smpi_process_simulated_elapsed();
599 #ifdef HAVE_TRACING
600   int rank = smpi_process_index();
601   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602   extra->type = TRACING_ALLREDUCE;
603   extra->send_size = comm_size;
604   extra->comp_size = comp_size;
605   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
606
607   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
608 #endif
609     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612   smpi_execute_flops(comp_size);
613 #ifdef HAVE_TRACING
614   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
615 #endif
616   log_timed_action (action, clock);
617 }
618
619 static void action_allToAll(const char *const *action) {
620   double clock = smpi_process_simulated_elapsed();
621   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622   int send_size = parse_double(action[2]);
623   int recv_size = parse_double(action[3]);
624   MPI_Datatype MPI_CURRENT_TYPE2;
625
626   if(action[4]) {
627     MPI_CURRENT_TYPE=decode_datatype(action[4]);
628     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
629   }
630   else {
631     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
633   }
634   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
636
637 #ifdef HAVE_TRACING
638   int rank = smpi_process_index();
639   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640   extra->type = TRACING_ALLTOALL;
641   extra->send_size = send_size;
642   extra->recv_size = recv_size;
643   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
645
646   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
647 #endif
648
649   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
650
651 #ifdef HAVE_TRACING
652   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
653 #endif
654   log_timed_action (action, clock);
655
656 }
657
658
659 static void action_gather(const char *const *action) {
660   /*
661  The structure of the gather action for the rank 0 (total 4 processes) 
662  is the following:   
663  0 gather 68 68 0 0 0
664
665   where: 
666   1) 68 is the sendcounts
667   2) 68 is the recvcounts
668   3) 0 is the root node
669   4) 0 is the send datatype id, see decode_datatype()
670   5) 0 is the recv datatype id, see decode_datatype()
671   */
672   double clock = smpi_process_simulated_elapsed();
673   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674   int send_size = parse_double(action[2]);
675   int recv_size = parse_double(action[3]);
676   MPI_Datatype MPI_CURRENT_TYPE2;
677   if(action[5]) {
678     MPI_CURRENT_TYPE=decode_datatype(action[5]);
679     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
680   } else {
681     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
683   }
684   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
685   void *recv = NULL;
686   int root=0;
687   if(action[4])
688     root=atoi(action[4]);
689   int rank = smpi_comm_rank(MPI_COMM_WORLD);
690
691   if(rank==root)
692     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
693
694 #ifdef HAVE_TRACING
695   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
696   extra->type = TRACING_GATHER;
697   extra->send_size = send_size;
698   extra->recv_size = recv_size;
699   extra->root = root;
700   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
701   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
702
703   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
704 #endif
705   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
706                 recv, recv_size, MPI_CURRENT_TYPE2,
707                 root, MPI_COMM_WORLD);
708
709 #ifdef HAVE_TRACING
710   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
711 #endif
712   log_timed_action (action, clock);
713
714 }
715
716
717
718 static void action_gatherv(const char *const *action) {
719   /*
720  The structure of the gatherv action for the rank 0 (total 4 processes)
721  is the following:
722  0 gather 68 68 10 10 10 0 0 0
723
724   where:
725   1) 68 is the sendcount
726   2) 68 10 10 10 is the recvcounts
727   3) 0 is the root node
728   4) 0 is the send datatype id, see decode_datatype()
729   5) 0 is the recv datatype id, see decode_datatype()
730   */
731   double clock = smpi_process_simulated_elapsed();
732   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
733   int send_size = parse_double(action[2]);
734   int *disps = xbt_new0(int, comm_size);
735   int *recvcounts = xbt_new0(int, comm_size);
736   int i=0,recv_sum=0;
737
738   MPI_Datatype MPI_CURRENT_TYPE2;
739   if(action[4+comm_size]) {
740     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
741     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
742   } else {
743     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
744     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
745   }
746   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
747   void *recv = NULL;
748   for(i=0;i<comm_size;i++) {
749     recvcounts[i] = atoi(action[i+3]);
750     recv_sum=recv_sum+recvcounts[i];
751     disps[i] = 0;
752   }
753
754   int root=atoi(action[3+comm_size]);
755   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
756
757   if(rank==root)
758     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
759
760 #ifdef HAVE_TRACING
761   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
762   extra->type = TRACING_GATHERV;
763   extra->send_size = send_size;
764   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
765   for(i=0; i< comm_size; i++)//copy data to avoid bad free
766     extra->recvcounts[i] = recvcounts[i];
767   extra->root = root;
768   extra->num_processes = comm_size;
769   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
770   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
771
772   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
773 #endif
774 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
775                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
776                 root, MPI_COMM_WORLD);
777
778 #ifdef HAVE_TRACING
779   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
780 #endif
781
782   log_timed_action (action, clock);
783   xbt_free(recvcounts);
784   xbt_free(disps);
785 }
786
787 static void action_reducescatter(const char *const *action) {
788
789     /*
790  The structure of the reducescatter action for the rank 0 (total 4 processes) 
791  is the following:   
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
793
794   where: 
795   1) The first four values after the name of the action declare the recvcounts array
796   2) The value 11346849 is the amount of instructions
797   3) The last value corresponds to the datatype, see decode_datatype().
798
799   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
800
801    */
802
803   double clock = smpi_process_simulated_elapsed();
804   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805   int comp_size = parse_double(action[2+comm_size]);
806   int *recvcounts = xbt_new0(int, comm_size);  
807   int *disps = xbt_new0(int, comm_size);  
808   int i=0;
809   int rank = smpi_process_index();
810   int size = 0;
811   if(action[3+comm_size])
812     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
813   else
814     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
815
816   for(i=0;i<comm_size;i++) {
817     recvcounts[i] = atoi(action[i+2]);
818     disps[i] = 0;
819     size+=recvcounts[i];
820   }
821
822 #ifdef HAVE_TRACING
823   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824   extra->type = TRACING_REDUCE_SCATTER;
825   extra->send_size = 0;
826   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827   for(i=0; i< comm_size; i++)//copy data to avoid bad free
828     extra->recvcounts[i] = recvcounts[i];
829   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
830   extra->comp_size = comp_size;
831   extra->num_processes = comm_size;
832
833   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834 #endif
835   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
837    
838    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
839        MPI_COMM_WORLD);
840    smpi_execute_flops(comp_size);
841
842
843 #ifdef HAVE_TRACING
844   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
845 #endif
846   xbt_free(recvcounts);
847   xbt_free(disps);
848   log_timed_action (action, clock);
849 }
850
851 static void action_allgather(const char *const *action) {
852   /*
853  The structure of the allgather action for the rank 0 (total 4 processes) 
854  is the following:   
855   0 allGather 275427 275427
856
857   where: 
858   1) 275427 is the sendcount
859   2) 275427 is the recvcount
860   3) No more values mean that the datatype for sent and receive buffer
861   is the default one, see decode_datatype().
862
863    */
864
865   double clock = smpi_process_simulated_elapsed();
866
867   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
868   int sendcount=atoi(action[2]); 
869   int recvcount=atoi(action[3]); 
870
871   MPI_Datatype MPI_CURRENT_TYPE2;
872
873   if(action[4]) {
874     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
875     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
876   } else {
877     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
878     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
879   }
880   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
881   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
882
883 #ifdef HAVE_TRACING
884   int rank = smpi_process_index();
885   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
886   extra->type = TRACING_ALLGATHER;
887   extra->send_size = sendcount;
888   extra->recv_size= recvcount;
889   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
890   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
891   extra->num_processes = comm_size;
892
893   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
894 #endif
895
896   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
897
898 #ifdef HAVE_TRACING
899   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
900 #endif
901
902   log_timed_action (action, clock);
903 }
904
905 static void action_allgatherv(const char *const *action) {
906
907   /*
908  The structure of the allgatherv action for the rank 0 (total 4 processes) 
909  is the following:   
910 0 allGatherV 275427 275427 275427 275427 204020
911
912   where: 
913   1) 275427 is the sendcount
914   2) The next four elements declare the recvcounts array
915   3) No more values mean that the datatype for sent and receive buffer
916   is the default one, see decode_datatype().
917
918    */
919
920   double clock = smpi_process_simulated_elapsed();
921
922   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
923   int i=0;
924   int sendcount=atoi(action[2]);
925   int *recvcounts = xbt_new0(int, comm_size);  
926   int *disps = xbt_new0(int, comm_size);  
927   int recv_sum=0;  
928   MPI_Datatype MPI_CURRENT_TYPE2;
929
930   if(action[3+comm_size]) {
931     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
932     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
933   } else {
934     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
935     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
936   }
937   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
938
939   for(i=0;i<comm_size;i++) {
940     recvcounts[i] = atoi(action[i+3]);
941     recv_sum=recv_sum+recvcounts[i];
942   }
943   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
944
945 #ifdef HAVE_TRACING
946   int rank = smpi_process_index();
947   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
948   extra->type = TRACING_ALLGATHERV;
949   extra->send_size = sendcount;
950   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
951   for(i=0; i< comm_size; i++)//copy data to avoid bad free
952     extra->recvcounts[i] = recvcounts[i];
953   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
954   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
955   extra->num_processes = comm_size;
956
957   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
958 #endif
959
960   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
961
962 #ifdef HAVE_TRACING
963   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
964 #endif
965
966   log_timed_action (action, clock);
967   xbt_free(recvcounts);
968   xbt_free(disps);
969 }
970
971 static void action_allToAllv(const char *const *action) {
972   /*
973  The structure of the allToAllV action for the rank 0 (total 4 processes) 
974  is the following:   
975   0 allToAllV 100 1 7 10 12 100 1 70 10 5
976
977   where: 
978   1) 100 is the size of the send buffer *sizeof(int),
979   2) 1 7 10 12 is the sendcounts array
980   3) 100*sizeof(int) is the size of the receiver buffer
981   4)  1 70 10 5 is the recvcounts array
982
983    */
984
985
986   double clock = smpi_process_simulated_elapsed();
987
988   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
989   int send_buf_size=0,recv_buf_size=0,i=0;
990   int *sendcounts = xbt_new0(int, comm_size);  
991   int *recvcounts = xbt_new0(int, comm_size);  
992   int *senddisps = xbt_new0(int, comm_size);  
993   int *recvdisps = xbt_new0(int, comm_size);  
994
995   MPI_Datatype MPI_CURRENT_TYPE2;
996
997   send_buf_size=parse_double(action[2]);
998   recv_buf_size=parse_double(action[3+comm_size]);
999   if(action[4+2*comm_size]) {
1000     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1001     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1002   }
1003   else {
1004       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1005       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1006   }
1007
1008   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1009   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1010
1011   for(i=0;i<comm_size;i++) {
1012     sendcounts[i] = atoi(action[i+3]);
1013     recvcounts[i] = atoi(action[i+4+comm_size]);
1014   }
1015
1016
1017 #ifdef HAVE_TRACING
1018   int rank = smpi_process_index();
1019   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1020   extra->type = TRACING_ALLTOALLV;
1021   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1022   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1023   extra->num_processes = comm_size;
1024
1025   for(i=0; i< comm_size; i++){//copy data to avoid bad free
1026     extra->send_size += sendcounts[i];
1027     extra->sendcounts[i] = sendcounts[i];
1028     extra->recv_size += recvcounts[i];
1029     extra->recvcounts[i] = recvcounts[i];
1030   }
1031   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
1032   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
1033
1034   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1035 #endif
1036   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1037                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1038                                MPI_COMM_WORLD);
1039 #ifdef HAVE_TRACING
1040   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1041 #endif
1042
1043   log_timed_action (action, clock);
1044   xbt_free(sendcounts);
1045   xbt_free(recvcounts);
1046   xbt_free(senddisps);
1047   xbt_free(recvdisps);
1048 }
1049
1050 void smpi_replay_init(int *argc, char***argv){
1051   smpi_process_init(argc, argv);
1052   smpi_process_mark_as_initialized();
1053   smpi_process_set_replaying(1);
1054 #ifdef HAVE_TRACING
1055   int rank = smpi_process_index();
1056   TRACE_smpi_init(rank);
1057   TRACE_smpi_computing_init(rank);
1058   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1059   extra->type = TRACING_INIT;
1060   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1061   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1062 #endif
1063
1064   if (!smpi_process_index()){
1065     _xbt_replay_action_init();
1066     xbt_replay_action_register("init",       action_init);
1067     xbt_replay_action_register("finalize",   action_finalize);
1068     xbt_replay_action_register("comm_size",  action_comm_size);
1069     xbt_replay_action_register("comm_split", action_comm_split);
1070     xbt_replay_action_register("comm_dup",   action_comm_dup);
1071     xbt_replay_action_register("send",       action_send);
1072     xbt_replay_action_register("Isend",      action_Isend);
1073     xbt_replay_action_register("recv",       action_recv);
1074     xbt_replay_action_register("Irecv",      action_Irecv);
1075     xbt_replay_action_register("test",       action_test);
1076     xbt_replay_action_register("wait",       action_wait);
1077     xbt_replay_action_register("waitAll",    action_waitall);
1078     xbt_replay_action_register("barrier",    action_barrier);
1079     xbt_replay_action_register("bcast",      action_bcast);
1080     xbt_replay_action_register("reduce",     action_reduce);
1081     xbt_replay_action_register("allReduce",  action_allReduce);
1082     xbt_replay_action_register("allToAll",   action_allToAll);
1083     xbt_replay_action_register("allToAllV",  action_allToAllv);
1084     xbt_replay_action_register("gather",  action_gather);
1085     xbt_replay_action_register("gatherV",  action_gatherv);
1086     xbt_replay_action_register("allGather",  action_allgather);
1087     xbt_replay_action_register("allGatherV",  action_allgatherv);
1088     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1089     xbt_replay_action_register("compute",    action_compute);
1090   }
1091   
1092   //if we have a delayed start, sleep here.
1093   if(*argc>2){
1094     char *endptr;
1095     double value = strtod((*argv)[2], &endptr);
1096     if (*endptr != '\0')
1097       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1098     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1099     smpi_execute_flops(value);
1100   }
1101   xbt_replay_action_runner(*argc, *argv);
1102 }
1103
1104 int smpi_replay_finalize(){
1105   double sim_time= 1.;
1106   /* One active process will stop. Decrease the counter*/
1107   XBT_DEBUG("There are %lu elements in reqq[*]",
1108             xbt_dynar_length(reqq[smpi_process_index()]));
1109   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1110     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1111     MPI_Request requests[count_requests];
1112     MPI_Status status[count_requests];
1113     unsigned int i;
1114
1115     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1116     smpi_mpi_waitall(count_requests, requests, status);
1117     active_processes--;
1118   } else {
1119     active_processes--;
1120   }
1121
1122   if(!active_processes){
1123     /* Last process alive speaking */
1124     /* end the simulated timer */
1125     sim_time = smpi_process_simulated_elapsed();
1126   }
1127   
1128
1129   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1130
1131   if(!active_processes){
1132     XBT_INFO("Simulation time %f", sim_time);
1133     _xbt_replay_action_exit();
1134     xbt_free(sendbuffer);
1135     xbt_free(recvbuffer);
1136     xbt_free(reqq);
1137     reqq = NULL;
1138   }
1139   
1140
1141 #ifdef HAVE_TRACING
1142   int rank = smpi_process_index();
1143   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1144   extra->type = TRACING_FINALIZE;
1145   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1146 #endif
1147   smpi_process_finalize();
1148 #ifdef HAVE_TRACING
1149   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1150   TRACE_smpi_finalize(smpi_process_index());
1151 #endif
1152   smpi_process_destroy();
1153   return MPI_SUCCESS;
1154 }