Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
previous commit was a mistake. Handle the case more gracefully.
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   //if request is null here, this may mean that a previous test has succeeded 
355   //Different times in traced application and replayed version may lead to this 
356   //In this case, ignore the extra calls.
357   if(request){
358 #ifdef HAVE_TRACING
359   int rank = smpi_process_index();
360   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
361   extra->type=TRACING_TEST;
362   TRACE_smpi_testing_in(rank, extra);
363 #endif
364
365     flag = smpi_mpi_test(&request, &status);
366
367   XBT_DEBUG("MPI_Test result: %d", flag);
368   /* push back request in dynar to be caught by a subsequent wait. if the test
369    * did succeed, the request is now NULL.
370    */
371   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
372
373 #ifdef HAVE_TRACING
374   TRACE_smpi_testing_out(rank);
375 #endif
376   }
377   log_timed_action (action, clock);
378 }
379
380 static void action_wait(const char *const *action){
381   double clock = smpi_process_simulated_elapsed();
382   MPI_Request request;
383   MPI_Status status;
384
385   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
386       "action wait not preceded by any irecv or isend: %s",
387       xbt_str_join_array(action," "));
388   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
389
390   if (!request){
391     /* Assuming that the trace is well formed, this mean the comm might have
392      * been caught by a MPI_test. Then just return.
393      */
394     return;
395   }
396
397 #ifdef HAVE_TRACING
398   int rank = request->comm != MPI_COMM_NULL
399       ? smpi_comm_rank(request->comm)
400       : -1;
401
402   MPI_Group group = smpi_comm_group(request->comm);
403   int src_traced = smpi_group_rank(group, request->src);
404   int dst_traced = smpi_group_rank(group, request->dst);
405   int is_wait_for_receive = request->recv;
406   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
407   extra->type = TRACING_WAIT;
408   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
409 #endif
410   smpi_mpi_wait(&request, &status);
411 #ifdef HAVE_TRACING
412   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
413   if (is_wait_for_receive) {
414     TRACE_smpi_recv(rank, src_traced, dst_traced);
415   }
416 #endif
417
418   log_timed_action (action, clock);
419 }
420
421 static void action_waitall(const char *const *action){
422   double clock = smpi_process_simulated_elapsed();
423   int count_requests=0;
424   unsigned int i=0;
425
426   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
427
428   if (count_requests>0) {
429     MPI_Request requests[count_requests];
430     MPI_Status status[count_requests];
431
432     /*  The reqq is an array of dynars. Its index corresponds to the rank.
433      Thus each rank saves its own requests to the array request. */
434     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
435
436   #ifdef HAVE_TRACING
437    //save information from requests
438
439    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
440    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
441    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
442    for (i = 0; i < count_requests; i++) {
443     if(requests[i]){
444       int *asrc = xbt_new(int, 1);
445       int *adst = xbt_new(int, 1);
446       int *arecv = xbt_new(int, 1);
447       *asrc = requests[i]->src;
448       *adst = requests[i]->dst;
449       *arecv = requests[i]->recv;
450       xbt_dynar_insert_at(srcs, i, asrc);
451       xbt_dynar_insert_at(dsts, i, adst);
452       xbt_dynar_insert_at(recvs, i, arecv);
453       xbt_free(asrc);
454       xbt_free(adst);
455       xbt_free(arecv);
456     }else {
457       int *t = xbt_new(int, 1);
458       xbt_dynar_insert_at(srcs, i, t);
459       xbt_dynar_insert_at(dsts, i, t);
460       xbt_dynar_insert_at(recvs, i, t);
461       xbt_free(t);
462     }
463    }
464    int rank_traced = smpi_process_index();
465    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
466    extra->type = TRACING_WAITALL;
467    extra->send_size=count_requests;
468    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
469  #endif
470
471     smpi_mpi_waitall(count_requests, requests, status);
472
473   #ifdef HAVE_TRACING
474    for (i = 0; i < count_requests; i++) {
475     int src_traced, dst_traced, is_wait_for_receive;
476     xbt_dynar_get_cpy(srcs, i, &src_traced);
477     xbt_dynar_get_cpy(dsts, i, &dst_traced);
478     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
479     if (is_wait_for_receive) {
480       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
481     }
482    }
483    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
484    //clean-up of dynars
485    xbt_dynar_free(&srcs);
486    xbt_dynar_free(&dsts);
487    xbt_dynar_free(&recvs);
488   #endif
489
490    int freedrank=smpi_process_index();
491    xbt_dynar_free_container(&(reqq[freedrank]));
492    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
493   }
494   log_timed_action (action, clock);
495 }
496
497 static void action_barrier(const char *const *action){
498   double clock = smpi_process_simulated_elapsed();
499 #ifdef HAVE_TRACING
500   int rank = smpi_process_index();
501   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
502   extra->type = TRACING_BARRIER;
503   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
504 #endif
505   mpi_coll_barrier_fun(MPI_COMM_WORLD);
506 #ifdef HAVE_TRACING
507   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
508 #endif
509
510   log_timed_action (action, clock);
511 }
512
513
514 static void action_bcast(const char *const *action)
515 {
516   double size = parse_double(action[2]);
517   double clock = smpi_process_simulated_elapsed();
518   int root=0;
519   /*
520    * Initialize MPI_CURRENT_TYPE in order to decrease
521    * the number of the checks
522    * */
523   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
524
525   if(action[3]) {
526     root= atoi(action[3]);
527     if(action[4]) {
528       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
529     }
530   }
531
532 #ifdef HAVE_TRACING
533   int rank = smpi_process_index();
534   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
535
536   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
537   extra->type = TRACING_BCAST;
538   extra->send_size = size;
539   extra->root = root_traced;
540   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
542
543 #endif
544     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
545   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
546 #ifdef HAVE_TRACING
547   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
548 #endif
549   log_timed_action (action, clock);
550 }
551
552 static void action_reduce(const char *const *action)
553 {
554   double comm_size = parse_double(action[2]);
555   double comp_size = parse_double(action[3]);
556   double clock = smpi_process_simulated_elapsed();
557   int root=0;
558   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
559
560   if(action[4]) {
561     root= atoi(action[4]);
562     if(action[5]) {
563       MPI_CURRENT_TYPE=decode_datatype(action[5]);
564     }
565   }
566   
567   
568
569 #ifdef HAVE_TRACING
570   int rank = smpi_process_index();
571   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_REDUCE;
574   extra->send_size = comm_size;
575   extra->comp_size = comp_size;
576   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577   extra->root = root_traced;
578
579   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
580 #endif
581     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
582     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
583    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
584    smpi_execute_flops(comp_size);
585 #ifdef HAVE_TRACING
586   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
587 #endif
588   log_timed_action (action, clock);
589 }
590
591 static void action_allReduce(const char *const *action) {
592   double comm_size = parse_double(action[2]);
593   double comp_size = parse_double(action[3]);
594
595   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
596   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
597
598   double clock = smpi_process_simulated_elapsed();
599 #ifdef HAVE_TRACING
600   int rank = smpi_process_index();
601   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
602   extra->type = TRACING_ALLREDUCE;
603   extra->send_size = comm_size;
604   extra->comp_size = comp_size;
605   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
606
607   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
608 #endif
609     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
611   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
612   smpi_execute_flops(comp_size);
613 #ifdef HAVE_TRACING
614   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
615 #endif
616   log_timed_action (action, clock);
617 }
618
619 static void action_allToAll(const char *const *action) {
620   double clock = smpi_process_simulated_elapsed();
621   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
622   int send_size = parse_double(action[2]);
623   int recv_size = parse_double(action[3]);
624   MPI_Datatype MPI_CURRENT_TYPE2;
625
626   if(action[4]) {
627     MPI_CURRENT_TYPE=decode_datatype(action[4]);
628     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
629   }
630   else {
631     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
633   }
634   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
636
637 #ifdef HAVE_TRACING
638   int rank = smpi_process_index();
639   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
640   extra->type = TRACING_ALLTOALL;
641   extra->send_size = send_size;
642   extra->recv_size = recv_size;
643   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
644   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
645
646   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
647 #endif
648
649   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
650
651 #ifdef HAVE_TRACING
652   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
653 #endif
654   log_timed_action (action, clock);
655
656 }
657
658
659 static void action_gather(const char *const *action) {
660   /*
661  The structure of the gather action for the rank 0 (total 4 processes) 
662  is the following:   
663  0 gather 68 68 0 0 0
664
665   where: 
666   1) 68 is the sendcounts
667   2) 68 is the recvcounts
668   3) 0 is the root node
669   4) 0 is the send datatype id, see decode_datatype()
670   5) 0 is the recv datatype id, see decode_datatype()
671   */
672   double clock = smpi_process_simulated_elapsed();
673   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674   int send_size = parse_double(action[2]);
675   int recv_size = parse_double(action[3]);
676   MPI_Datatype MPI_CURRENT_TYPE2;
677   if(action[5]) {
678     MPI_CURRENT_TYPE=decode_datatype(action[5]);
679     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
680   } else {
681     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
682     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
683   }
684   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
685   void *recv = NULL;
686
687   int root=atoi(action[4]);
688   int rank = smpi_comm_rank(MPI_COMM_WORLD);
689
690   if(rank==root)
691     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
692
693 #ifdef HAVE_TRACING
694   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
695   extra->type = TRACING_GATHER;
696   extra->send_size = send_size;
697   extra->recv_size = recv_size;
698   extra->root = root;
699   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
700   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
701
702   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
703 #endif
704   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
705                 recv, recv_size, MPI_CURRENT_TYPE2,
706                 root, MPI_COMM_WORLD);
707
708 #ifdef HAVE_TRACING
709   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
710 #endif
711   log_timed_action (action, clock);
712
713 }
714
715
716
717 static void action_gatherv(const char *const *action) {
718   /*
719  The structure of the gatherv action for the rank 0 (total 4 processes)
720  is the following:
721  0 gather 68 68 10 10 10 0 0 0
722
723   where:
724   1) 68 is the sendcount
725   2) 68 10 10 10 is the recvcounts
726   3) 0 is the root node
727   4) 0 is the send datatype id, see decode_datatype()
728   5) 0 is the recv datatype id, see decode_datatype()
729   */
730   double clock = smpi_process_simulated_elapsed();
731   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
732   int send_size = parse_double(action[2]);
733   int *disps = xbt_new0(int, comm_size);
734   int *recvcounts = xbt_new0(int, comm_size);
735   int i=0,recv_sum=0;
736
737   MPI_Datatype MPI_CURRENT_TYPE2;
738   if(action[4+comm_size]) {
739     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
740     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
741   } else {
742     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
743     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
744   }
745   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
746   void *recv = NULL;
747   for(i=0;i<comm_size;i++) {
748     recvcounts[i] = atoi(action[i+3]);
749     recv_sum=recv_sum+recvcounts[i];
750     disps[i] = 0;
751   }
752
753   int root=atoi(action[3+comm_size]);
754   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
755
756   if(rank==root)
757     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
758
759 #ifdef HAVE_TRACING
760   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
761   extra->type = TRACING_GATHERV;
762   extra->send_size = send_size;
763   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
764   for(i=0; i< comm_size; i++)//copy data to avoid bad free
765     extra->recvcounts[i] = recvcounts[i];
766   extra->root = root;
767   extra->num_processes = comm_size;
768   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
769   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
770
771   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
772 #endif
773 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
774                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
775                 root, MPI_COMM_WORLD);
776
777 #ifdef HAVE_TRACING
778   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
779 #endif
780
781   log_timed_action (action, clock);
782   xbt_free(recvcounts);
783   xbt_free(disps);
784 }
785
786 static void action_reducescatter(const char *const *action) {
787
788     /*
789  The structure of the reducescatter action for the rank 0 (total 4 processes) 
790  is the following:   
791 0 reduceScatter 275427 275427 275427 204020 11346849 0
792
793   where: 
794   1) The first four values after the name of the action declare the recvcounts array
795   2) The value 11346849 is the amount of instructions
796   3) The last value corresponds to the datatype, see decode_datatype().
797
798   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
799
800    */
801
802   double clock = smpi_process_simulated_elapsed();
803   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
804   int comp_size = parse_double(action[2+comm_size]);
805   int *recvcounts = xbt_new0(int, comm_size);  
806   int *disps = xbt_new0(int, comm_size);  
807   int i=0;
808   int rank = smpi_process_index();
809   int size = 0;
810   if(action[3+comm_size])
811     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
812   else
813     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
814
815   for(i=0;i<comm_size;i++) {
816     recvcounts[i] = atoi(action[i+2]);
817     disps[i] = 0;
818     size+=recvcounts[i];
819   }
820
821 #ifdef HAVE_TRACING
822   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823   extra->type = TRACING_REDUCE_SCATTER;
824   extra->send_size = 0;
825   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
826   for(i=0; i< comm_size; i++)//copy data to avoid bad free
827     extra->recvcounts[i] = recvcounts[i];
828   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
829   extra->comp_size = comp_size;
830   extra->num_processes = comm_size;
831
832   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
833 #endif
834   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
835    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836    
837    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
838        MPI_COMM_WORLD);
839    smpi_execute_flops(comp_size);
840
841
842 #ifdef HAVE_TRACING
843   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
844 #endif
845   xbt_free(recvcounts);
846   xbt_free(disps);
847   log_timed_action (action, clock);
848 }
849
850
851 static void action_allgatherv(const char *const *action) {
852
853   /*
854  The structure of the allgatherv action for the rank 0 (total 4 processes) 
855  is the following:   
856 0 allGatherV 275427 275427 275427 275427 204020
857
858   where: 
859   1) 275427 is the sendcount
860   2) The next four elements declare the recvcounts array
861   3) No more values mean that the datatype for sent and receive buffer
862   is the default one, see decode_datatype().
863
864    */
865
866   double clock = smpi_process_simulated_elapsed();
867
868   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
869   int i=0;
870   int sendcount=atoi(action[2]);
871   int *recvcounts = xbt_new0(int, comm_size);  
872   int *disps = xbt_new0(int, comm_size);  
873   int recv_sum=0;  
874   MPI_Datatype MPI_CURRENT_TYPE2;
875
876   if(action[3+comm_size]) {
877     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
878     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
879   } else {
880     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
881     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
882   }
883   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
884
885   for(i=0;i<comm_size;i++) {
886     recvcounts[i] = atoi(action[i+3]);
887     recv_sum=recv_sum+recvcounts[i];
888   }
889   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
890
891 #ifdef HAVE_TRACING
892   int rank = smpi_process_index();
893   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
894   extra->type = TRACING_ALLGATHERV;
895   extra->send_size = sendcount;
896   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
897   for(i=0; i< comm_size; i++)//copy data to avoid bad free
898     extra->recvcounts[i] = recvcounts[i];
899   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
900   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
901   extra->num_processes = comm_size;
902
903   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
904 #endif
905
906   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
907
908 #ifdef HAVE_TRACING
909   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
910 #endif
911
912   log_timed_action (action, clock);
913   xbt_free(recvcounts);
914   xbt_free(disps);
915 }
916
917
918 static void action_allToAllv(const char *const *action) {
919   /*
920  The structure of the allToAllV action for the rank 0 (total 4 processes) 
921  is the following:   
922   0 allToAllV 100 1 7 10 12 100 1 70 10 5
923
924   where: 
925   1) 100 is the size of the send buffer *sizeof(int),
926   2) 1 7 10 12 is the sendcounts array
927   3) 100*sizeof(int) is the size of the receiver buffer
928   4)  1 70 10 5 is the recvcounts array
929
930    */
931
932
933   double clock = smpi_process_simulated_elapsed();
934
935   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
936   int send_buf_size=0,recv_buf_size=0,i=0;
937   int *sendcounts = xbt_new0(int, comm_size);  
938   int *recvcounts = xbt_new0(int, comm_size);  
939   int *senddisps = xbt_new0(int, comm_size);  
940   int *recvdisps = xbt_new0(int, comm_size);  
941
942   MPI_Datatype MPI_CURRENT_TYPE2;
943
944   send_buf_size=parse_double(action[2]);
945   recv_buf_size=parse_double(action[3+comm_size]);
946   if(action[4+2*comm_size]) {
947     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
948     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
949   }
950   else {
951       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
952       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
953   }
954
955   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
956   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
957
958   for(i=0;i<comm_size;i++) {
959     sendcounts[i] = atoi(action[i+3]);
960     recvcounts[i] = atoi(action[i+4+comm_size]);
961   }
962
963
964 #ifdef HAVE_TRACING
965   int rank = smpi_process_index();
966   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
967   extra->type = TRACING_ALLTOALLV;
968   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
969   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
970   extra->num_processes = comm_size;
971
972   for(i=0; i< comm_size; i++){//copy data to avoid bad free
973     extra->send_size += sendcounts[i];
974     extra->sendcounts[i] = sendcounts[i];
975     extra->recv_size += recvcounts[i];
976     extra->recvcounts[i] = recvcounts[i];
977   }
978   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
979   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
980
981   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
982 #endif
983   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
984                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
985                                MPI_COMM_WORLD);
986 #ifdef HAVE_TRACING
987   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
988 #endif
989
990   log_timed_action (action, clock);
991   xbt_free(sendcounts);
992   xbt_free(recvcounts);
993   xbt_free(senddisps);
994   xbt_free(recvdisps);
995 }
996
997 void smpi_replay_init(int *argc, char***argv){
998   smpi_process_init(argc, argv);
999   smpi_process_mark_as_initialized();
1000   smpi_process_set_replaying(1);
1001 #ifdef HAVE_TRACING
1002   int rank = smpi_process_index();
1003   TRACE_smpi_init(rank);
1004   TRACE_smpi_computing_init(rank);
1005   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1006   extra->type = TRACING_INIT;
1007   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1008   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1009 #endif
1010
1011   if (!smpi_process_index()){
1012     _xbt_replay_action_init();
1013     xbt_replay_action_register("init",       action_init);
1014     xbt_replay_action_register("finalize",   action_finalize);
1015     xbt_replay_action_register("comm_size",  action_comm_size);
1016     xbt_replay_action_register("comm_split", action_comm_split);
1017     xbt_replay_action_register("comm_dup",   action_comm_dup);
1018     xbt_replay_action_register("send",       action_send);
1019     xbt_replay_action_register("Isend",      action_Isend);
1020     xbt_replay_action_register("recv",       action_recv);
1021     xbt_replay_action_register("Irecv",      action_Irecv);
1022     xbt_replay_action_register("test",       action_test);
1023     xbt_replay_action_register("wait",       action_wait);
1024     xbt_replay_action_register("waitAll",    action_waitall);
1025     xbt_replay_action_register("barrier",    action_barrier);
1026     xbt_replay_action_register("bcast",      action_bcast);
1027     xbt_replay_action_register("reduce",     action_reduce);
1028     xbt_replay_action_register("allReduce",  action_allReduce);
1029     xbt_replay_action_register("allToAll",   action_allToAll);
1030     xbt_replay_action_register("allToAllV",  action_allToAllv);
1031     xbt_replay_action_register("gather",  action_gather);
1032     xbt_replay_action_register("gatherV",  action_gatherv);
1033     xbt_replay_action_register("allGatherV",  action_allgatherv);
1034     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1035     xbt_replay_action_register("compute",    action_compute);
1036   }
1037   
1038   //if we have a delayed start, sleep here.
1039   if(*argc>2){
1040     char *endptr;
1041     double value = strtod((*argv)[2], &endptr);
1042     if (*endptr != '\0')
1043       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1044     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1045     smpi_execute_flops(value);
1046   }
1047   xbt_replay_action_runner(*argc, *argv);
1048 }
1049
1050 int smpi_replay_finalize(){
1051   double sim_time= 1.;
1052   /* One active process will stop. Decrease the counter*/
1053   XBT_DEBUG("There are %lu elements in reqq[*]",
1054             xbt_dynar_length(reqq[smpi_process_index()]));
1055   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1056     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1057     MPI_Request requests[count_requests];
1058     MPI_Status status[count_requests];
1059     unsigned int i;
1060
1061     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1062     smpi_mpi_waitall(count_requests, requests, status);
1063     active_processes--;
1064   } else {
1065     active_processes--;
1066   }
1067
1068   if(!active_processes){
1069     /* Last process alive speaking */
1070     /* end the simulated timer */
1071     sim_time = smpi_process_simulated_elapsed();
1072   }
1073   
1074
1075   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1076
1077   if(!active_processes){
1078     XBT_INFO("Simulation time %f", sim_time);
1079     _xbt_replay_action_exit();
1080     xbt_free(sendbuffer);
1081     xbt_free(recvbuffer);
1082     xbt_free(reqq);
1083     reqq = NULL;
1084   }
1085   
1086
1087 #ifdef HAVE_TRACING
1088   int rank = smpi_process_index();
1089   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1090   extra->type = TRACING_FINALIZE;
1091   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1092 #endif
1093   smpi_process_finalize();
1094 #ifdef HAVE_TRACING
1095   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1096   TRACE_smpi_finalize(smpi_process_index());
1097 #endif
1098   smpi_process_destroy();
1099   return MPI_SUCCESS;
1100 }