Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Apply patch suggested by fabien Chaix. This avoids issues with reqq being deleted...
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   xbt_assert(request != NULL, "found null request in reqq");
355
356 #ifdef HAVE_TRACING
357   int rank = smpi_process_index();
358   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359   extra->type=TRACING_TEST;
360   TRACE_smpi_testing_in(rank, extra);
361 #endif
362   flag = smpi_mpi_test(&request, &status);
363   XBT_DEBUG("MPI_Test result: %d", flag);
364   /* push back request in dynar to be caught by a subsequent wait. if the test
365    * did succeed, the request is now NULL.
366    */
367   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
368
369 #ifdef HAVE_TRACING
370   TRACE_smpi_testing_out(rank);
371 #endif
372
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   double clock = smpi_process_simulated_elapsed();
378   MPI_Request request;
379   MPI_Status status;
380
381   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382       "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
385
386   if (!request){
387     /* Assuming that the trace is well formed, this mean the comm might have
388      * been caught by a MPI_test. Then just return.
389      */
390     return;
391   }
392
393 #ifdef HAVE_TRACING
394   int rank = request->comm != MPI_COMM_NULL
395       ? smpi_comm_rank(request->comm)
396       : -1;
397
398   MPI_Group group = smpi_comm_group(request->comm);
399   int src_traced = smpi_group_rank(group, request->src);
400   int dst_traced = smpi_group_rank(group, request->dst);
401   int is_wait_for_receive = request->recv;
402   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403   extra->type = TRACING_WAIT;
404   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
405 #endif
406   smpi_mpi_wait(&request, &status);
407 #ifdef HAVE_TRACING
408   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409   if (is_wait_for_receive) {
410     TRACE_smpi_recv(rank, src_traced, dst_traced);
411   }
412 #endif
413
414   log_timed_action (action, clock);
415 }
416
417 static void action_waitall(const char *const *action){
418   double clock = smpi_process_simulated_elapsed();
419   int count_requests=0;
420   unsigned int i=0;
421
422   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
423
424   if (count_requests>0) {
425     MPI_Request requests[count_requests];
426     MPI_Status status[count_requests];
427
428     /*  The reqq is an array of dynars. Its index corresponds to the rank.
429      Thus each rank saves its own requests to the array request. */
430     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
431
432   #ifdef HAVE_TRACING
433    //save information from requests
434
435    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438    for (i = 0; i < count_requests; i++) {
439     if(requests[i]){
440       int *asrc = xbt_new(int, 1);
441       int *adst = xbt_new(int, 1);
442       int *arecv = xbt_new(int, 1);
443       *asrc = requests[i]->src;
444       *adst = requests[i]->dst;
445       *arecv = requests[i]->recv;
446       xbt_dynar_insert_at(srcs, i, asrc);
447       xbt_dynar_insert_at(dsts, i, adst);
448       xbt_dynar_insert_at(recvs, i, arecv);
449       xbt_free(asrc);
450       xbt_free(adst);
451       xbt_free(arecv);
452     }else {
453       int *t = xbt_new(int, 1);
454       xbt_dynar_insert_at(srcs, i, t);
455       xbt_dynar_insert_at(dsts, i, t);
456       xbt_dynar_insert_at(recvs, i, t);
457       xbt_free(t);
458     }
459    }
460    int rank_traced = smpi_process_index();
461    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462    extra->type = TRACING_WAITALL;
463    extra->send_size=count_requests;
464    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
465  #endif
466
467     smpi_mpi_waitall(count_requests, requests, status);
468
469   #ifdef HAVE_TRACING
470    for (i = 0; i < count_requests; i++) {
471     int src_traced, dst_traced, is_wait_for_receive;
472     xbt_dynar_get_cpy(srcs, i, &src_traced);
473     xbt_dynar_get_cpy(dsts, i, &dst_traced);
474     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475     if (is_wait_for_receive) {
476       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
477     }
478    }
479    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
480    //clean-up of dynars
481    xbt_dynar_free(&srcs);
482    xbt_dynar_free(&dsts);
483    xbt_dynar_free(&recvs);
484   #endif
485
486    int freedrank=smpi_process_index();
487    xbt_dynar_free_container(&(reqq[freedrank]));
488    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
489   }
490   log_timed_action (action, clock);
491 }
492
493 static void action_barrier(const char *const *action){
494   double clock = smpi_process_simulated_elapsed();
495 #ifdef HAVE_TRACING
496   int rank = smpi_process_index();
497   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
498   extra->type = TRACING_BARRIER;
499   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
500 #endif
501   mpi_coll_barrier_fun(MPI_COMM_WORLD);
502 #ifdef HAVE_TRACING
503   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
504 #endif
505
506   log_timed_action (action, clock);
507 }
508
509
510 static void action_bcast(const char *const *action)
511 {
512   double size = parse_double(action[2]);
513   double clock = smpi_process_simulated_elapsed();
514   int root=0;
515   /*
516    * Initialize MPI_CURRENT_TYPE in order to decrease
517    * the number of the checks
518    * */
519   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
520
521   if(action[3]) {
522     root= atoi(action[3]);
523     if(action[4]) {
524       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
525     }
526   }
527
528 #ifdef HAVE_TRACING
529   int rank = smpi_process_index();
530   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
531
532   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
533   extra->type = TRACING_BCAST;
534   extra->send_size = size;
535   extra->root = root_traced;
536   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
537   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
538
539 #endif
540     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
541   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
542 #ifdef HAVE_TRACING
543   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
544 #endif
545   log_timed_action (action, clock);
546 }
547
548 static void action_reduce(const char *const *action)
549 {
550   double comm_size = parse_double(action[2]);
551   double comp_size = parse_double(action[3]);
552   double clock = smpi_process_simulated_elapsed();
553   int root=0;
554   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
555
556   if(action[4]) {
557     root= atoi(action[4]);
558     if(action[5]) {
559       MPI_CURRENT_TYPE=decode_datatype(action[5]);
560     }
561   }
562   
563   
564
565 #ifdef HAVE_TRACING
566   int rank = smpi_process_index();
567   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
568   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
569   extra->type = TRACING_REDUCE;
570   extra->send_size = comm_size;
571   extra->comp_size = comp_size;
572   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
573   extra->root = root_traced;
574
575   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
576 #endif
577     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
578     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
580    smpi_execute_flops(comp_size);
581 #ifdef HAVE_TRACING
582   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
583 #endif
584   log_timed_action (action, clock);
585 }
586
587 static void action_allReduce(const char *const *action) {
588   double comm_size = parse_double(action[2]);
589   double comp_size = parse_double(action[3]);
590
591   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
592   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
593
594   double clock = smpi_process_simulated_elapsed();
595 #ifdef HAVE_TRACING
596   int rank = smpi_process_index();
597   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
598   extra->type = TRACING_ALLREDUCE;
599   extra->send_size = comm_size;
600   extra->comp_size = comp_size;
601   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
602
603   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
604 #endif
605     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
606     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
608   smpi_execute_flops(comp_size);
609 #ifdef HAVE_TRACING
610   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
611 #endif
612   log_timed_action (action, clock);
613 }
614
615 static void action_allToAll(const char *const *action) {
616   double clock = smpi_process_simulated_elapsed();
617   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
618   int send_size = parse_double(action[2]);
619   int recv_size = parse_double(action[3]);
620   MPI_Datatype MPI_CURRENT_TYPE2;
621
622   if(action[4]) {
623     MPI_CURRENT_TYPE=decode_datatype(action[4]);
624     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
625   }
626   else {
627     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
628     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
629   }
630   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
631   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
632
633 #ifdef HAVE_TRACING
634   int rank = smpi_process_index();
635   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
636   extra->type = TRACING_ALLTOALL;
637   extra->send_size = send_size;
638   extra->recv_size = recv_size;
639   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
640   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
641
642   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
643 #endif
644
645   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
646
647 #ifdef HAVE_TRACING
648   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
649 #endif
650   log_timed_action (action, clock);
651
652 }
653
654
655 static void action_gather(const char *const *action) {
656   /*
657  The structure of the gather action for the rank 0 (total 4 processes) 
658  is the following:   
659  0 gather 68 68 0 0 0
660
661   where: 
662   1) 68 is the sendcounts
663   2) 68 is the recvcounts
664   3) 0 is the root node
665   4) 0 is the send datatype id, see decode_datatype()
666   5) 0 is the recv datatype id, see decode_datatype()
667   */
668   double clock = smpi_process_simulated_elapsed();
669   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
670   int send_size = parse_double(action[2]);
671   int recv_size = parse_double(action[3]);
672   MPI_Datatype MPI_CURRENT_TYPE2;
673   if(action[5]) {
674     MPI_CURRENT_TYPE=decode_datatype(action[5]);
675     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
676   } else {
677     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
678     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
679   }
680   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
681   void *recv = NULL;
682
683   int root=atoi(action[4]);
684   int rank = smpi_comm_rank(MPI_COMM_WORLD);
685
686   if(rank==root)
687     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
688
689 #ifdef HAVE_TRACING
690   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691   extra->type = TRACING_GATHER;
692   extra->send_size = send_size;
693   extra->recv_size = recv_size;
694   extra->root = root;
695   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
696   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
697
698   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
699 #endif
700   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
701                 recv, recv_size, MPI_CURRENT_TYPE2,
702                 root, MPI_COMM_WORLD);
703
704 #ifdef HAVE_TRACING
705   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
706 #endif
707   log_timed_action (action, clock);
708
709 }
710
711
712
713 static void action_gatherv(const char *const *action) {
714   /*
715  The structure of the gatherv action for the rank 0 (total 4 processes)
716  is the following:
717  0 gather 68 68 10 10 10 0 0 0
718
719   where:
720   1) 68 is the sendcount
721   2) 68 10 10 10 is the recvcounts
722   3) 0 is the root node
723   4) 0 is the send datatype id, see decode_datatype()
724   5) 0 is the recv datatype id, see decode_datatype()
725   */
726   double clock = smpi_process_simulated_elapsed();
727   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
728   int send_size = parse_double(action[2]);
729   int *disps = xbt_new0(int, comm_size);
730   int *recvcounts = xbt_new0(int, comm_size);
731   int i=0,recv_sum=0;
732
733   MPI_Datatype MPI_CURRENT_TYPE2;
734   if(action[4+comm_size]) {
735     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
736     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
737   } else {
738     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
739     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
740   }
741   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
742   void *recv = NULL;
743   for(i=0;i<comm_size;i++) {
744     recvcounts[i] = atoi(action[i+3]);
745     recv_sum=recv_sum+recvcounts[i];
746     disps[i] = 0;
747   }
748
749   int root=atoi(action[3+comm_size]);
750   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
751
752   if(rank==root)
753     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
754
755 #ifdef HAVE_TRACING
756   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
757   extra->type = TRACING_GATHERV;
758   extra->send_size = send_size;
759   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
760   for(i=0; i< comm_size; i++)//copy data to avoid bad free
761     extra->recvcounts[i] = recvcounts[i];
762   extra->root = root;
763   extra->num_processes = comm_size;
764   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
765   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
766
767   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
768 #endif
769 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
770                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
771                 root, MPI_COMM_WORLD);
772
773 #ifdef HAVE_TRACING
774   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
775 #endif
776
777   log_timed_action (action, clock);
778   xbt_free(recvcounts);
779   xbt_free(disps);
780 }
781
782 static void action_reducescatter(const char *const *action) {
783
784     /*
785  The structure of the reducescatter action for the rank 0 (total 4 processes) 
786  is the following:   
787 0 reduceScatter 275427 275427 275427 204020 11346849 0
788
789   where: 
790   1) The first four values after the name of the action declare the recvcounts array
791   2) The value 11346849 is the amount of instructions
792   3) The last value corresponds to the datatype, see decode_datatype().
793
794   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
795
796    */
797
798   double clock = smpi_process_simulated_elapsed();
799   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
800   int comp_size = parse_double(action[2+comm_size]);
801   int *recvcounts = xbt_new0(int, comm_size);  
802   int *disps = xbt_new0(int, comm_size);  
803   int i=0;
804   int rank = smpi_process_index();
805   int size = 0;
806   if(action[3+comm_size])
807     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
808   else
809     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
810
811   for(i=0;i<comm_size;i++) {
812     recvcounts[i] = atoi(action[i+2]);
813     disps[i] = 0;
814     size+=recvcounts[i];
815   }
816
817 #ifdef HAVE_TRACING
818   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
819   extra->type = TRACING_REDUCE_SCATTER;
820   extra->send_size = 0;
821   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
822   for(i=0; i< comm_size; i++)//copy data to avoid bad free
823     extra->recvcounts[i] = recvcounts[i];
824   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
825   extra->comp_size = comp_size;
826   extra->num_processes = comm_size;
827
828   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
829 #endif
830   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
831    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
832    
833    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
834        MPI_COMM_WORLD);
835    smpi_execute_flops(comp_size);
836
837
838 #ifdef HAVE_TRACING
839   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
840 #endif
841   xbt_free(recvcounts);
842   xbt_free(disps);
843   log_timed_action (action, clock);
844 }
845
846
847 static void action_allgatherv(const char *const *action) {
848
849   /*
850  The structure of the allgatherv action for the rank 0 (total 4 processes) 
851  is the following:   
852 0 allGatherV 275427 275427 275427 275427 204020
853
854   where: 
855   1) 275427 is the sendcount
856   2) The next four elements declare the recvcounts array
857   3) No more values mean that the datatype for sent and receive buffer
858   is the default one, see decode_datatype().
859
860    */
861
862   double clock = smpi_process_simulated_elapsed();
863
864   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
865   int i=0;
866   int sendcount=atoi(action[2]);
867   int *recvcounts = xbt_new0(int, comm_size);  
868   int *disps = xbt_new0(int, comm_size);  
869   int recv_sum=0;  
870   MPI_Datatype MPI_CURRENT_TYPE2;
871
872   if(action[3+comm_size]) {
873     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
874     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
875   } else {
876     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
877     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
878   }
879   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
880
881   for(i=0;i<comm_size;i++) {
882     recvcounts[i] = atoi(action[i+3]);
883     recv_sum=recv_sum+recvcounts[i];
884   }
885   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
886
887 #ifdef HAVE_TRACING
888   int rank = smpi_process_index();
889   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
890   extra->type = TRACING_ALLGATHERV;
891   extra->send_size = sendcount;
892   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
893   for(i=0; i< comm_size; i++)//copy data to avoid bad free
894     extra->recvcounts[i] = recvcounts[i];
895   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
896   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
897   extra->num_processes = comm_size;
898
899   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
900 #endif
901
902   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
903
904 #ifdef HAVE_TRACING
905   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
906 #endif
907
908   log_timed_action (action, clock);
909   xbt_free(recvcounts);
910   xbt_free(disps);
911 }
912
913
914 static void action_allToAllv(const char *const *action) {
915   /*
916  The structure of the allToAllV action for the rank 0 (total 4 processes) 
917  is the following:   
918   0 allToAllV 100 1 7 10 12 100 1 70 10 5
919
920   where: 
921   1) 100 is the size of the send buffer *sizeof(int),
922   2) 1 7 10 12 is the sendcounts array
923   3) 100*sizeof(int) is the size of the receiver buffer
924   4)  1 70 10 5 is the recvcounts array
925
926    */
927
928
929   double clock = smpi_process_simulated_elapsed();
930
931   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
932   int send_buf_size=0,recv_buf_size=0,i=0;
933   int *sendcounts = xbt_new0(int, comm_size);  
934   int *recvcounts = xbt_new0(int, comm_size);  
935   int *senddisps = xbt_new0(int, comm_size);  
936   int *recvdisps = xbt_new0(int, comm_size);  
937
938   MPI_Datatype MPI_CURRENT_TYPE2;
939
940   send_buf_size=parse_double(action[2]);
941   recv_buf_size=parse_double(action[3+comm_size]);
942   if(action[4+2*comm_size]) {
943     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
944     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
945   }
946   else {
947       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
948       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
949   }
950
951   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
952   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
953
954   for(i=0;i<comm_size;i++) {
955     sendcounts[i] = atoi(action[i+3]);
956     recvcounts[i] = atoi(action[i+4+comm_size]);
957   }
958
959
960 #ifdef HAVE_TRACING
961   int rank = smpi_process_index();
962   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
963   extra->type = TRACING_ALLTOALLV;
964   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
965   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
966   extra->num_processes = comm_size;
967
968   for(i=0; i< comm_size; i++){//copy data to avoid bad free
969     extra->send_size += sendcounts[i];
970     extra->sendcounts[i] = sendcounts[i];
971     extra->recv_size += recvcounts[i];
972     extra->recvcounts[i] = recvcounts[i];
973   }
974   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
975   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
976
977   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
978 #endif
979   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
980                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
981                                MPI_COMM_WORLD);
982 #ifdef HAVE_TRACING
983   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
984 #endif
985
986   log_timed_action (action, clock);
987   xbt_free(sendcounts);
988   xbt_free(recvcounts);
989   xbt_free(senddisps);
990   xbt_free(recvdisps);
991 }
992
993 void smpi_replay_init(int *argc, char***argv){
994   smpi_process_init(argc, argv);
995   smpi_process_mark_as_initialized();
996   smpi_process_set_replaying(1);
997 #ifdef HAVE_TRACING
998   int rank = smpi_process_index();
999   TRACE_smpi_init(rank);
1000   TRACE_smpi_computing_init(rank);
1001   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1002   extra->type = TRACING_INIT;
1003   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1004   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1005 #endif
1006
1007   if (!smpi_process_index()){
1008     _xbt_replay_action_init();
1009     xbt_replay_action_register("init",       action_init);
1010     xbt_replay_action_register("finalize",   action_finalize);
1011     xbt_replay_action_register("comm_size",  action_comm_size);
1012     xbt_replay_action_register("comm_split", action_comm_split);
1013     xbt_replay_action_register("comm_dup",   action_comm_dup);
1014     xbt_replay_action_register("send",       action_send);
1015     xbt_replay_action_register("Isend",      action_Isend);
1016     xbt_replay_action_register("recv",       action_recv);
1017     xbt_replay_action_register("Irecv",      action_Irecv);
1018     xbt_replay_action_register("test",       action_test);
1019     xbt_replay_action_register("wait",       action_wait);
1020     xbt_replay_action_register("waitAll",    action_waitall);
1021     xbt_replay_action_register("barrier",    action_barrier);
1022     xbt_replay_action_register("bcast",      action_bcast);
1023     xbt_replay_action_register("reduce",     action_reduce);
1024     xbt_replay_action_register("allReduce",  action_allReduce);
1025     xbt_replay_action_register("allToAll",   action_allToAll);
1026     xbt_replay_action_register("allToAllV",  action_allToAllv);
1027     xbt_replay_action_register("gather",  action_gather);
1028     xbt_replay_action_register("gatherV",  action_gatherv);
1029     xbt_replay_action_register("allGatherV",  action_allgatherv);
1030     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1031     xbt_replay_action_register("compute",    action_compute);
1032   }
1033   
1034   //if we have a delayed start, sleep here.
1035   if(*argc>2){
1036     char *endptr;
1037     double value = strtod((*argv)[2], &endptr);
1038     if (*endptr != '\0')
1039       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1040     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1041     smpi_execute_flops(value);
1042   }
1043   xbt_replay_action_runner(*argc, *argv);
1044 }
1045
1046 int smpi_replay_finalize(){
1047   double sim_time= 1.;
1048   /* One active process will stop. Decrease the counter*/
1049   XBT_DEBUG("There are %lu elements in reqq[*]",
1050             xbt_dynar_length(reqq[smpi_process_index()]));
1051   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1052     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1053     MPI_Request requests[count_requests];
1054     MPI_Status status[count_requests];
1055     unsigned int i;
1056
1057     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1058     smpi_mpi_waitall(count_requests, requests, status);
1059     active_processes--;
1060   } else {
1061     active_processes--;
1062   }
1063
1064   if(!active_processes){
1065     /* Last process alive speaking */
1066     /* end the simulated timer */
1067     sim_time = smpi_process_simulated_elapsed();
1068   }
1069   
1070
1071   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1072
1073   if(!active_processes){
1074     XBT_INFO("Simulation time %f", sim_time);
1075     _xbt_replay_action_exit();
1076     xbt_free(sendbuffer);
1077     xbt_free(recvbuffer);
1078     xbt_free(reqq);
1079     reqq = NULL;
1080   }
1081   
1082
1083 #ifdef HAVE_TRACING
1084   int rank = smpi_process_index();
1085   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1086   extra->type = TRACING_FINALIZE;
1087   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1088 #endif
1089   smpi_process_finalize();
1090 #ifdef HAVE_TRACING
1091   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1092   TRACE_smpi_finalize(smpi_process_index());
1093 #endif
1094   smpi_process_destroy();
1095   return MPI_SUCCESS;
1096 }