Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
various fixes about replay and buffer usage
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   xbt_assert(request != NULL, "found null request in reqq");
355
356 #ifdef HAVE_TRACING
357   int rank = smpi_process_index();
358   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359   extra->type=TRACING_TEST;
360   TRACE_smpi_testing_in(rank, extra);
361 #endif
362   flag = smpi_mpi_test(&request, &status);
363   XBT_DEBUG("MPI_Test result: %d", flag);
364   /* push back request in dynar to be caught by a subsequent wait. if the test
365    * did succeed, the request is now NULL.
366    */
367   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
368
369 #ifdef HAVE_TRACING
370   TRACE_smpi_testing_out(rank);
371 #endif
372
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   double clock = smpi_process_simulated_elapsed();
378   MPI_Request request;
379   MPI_Status status;
380
381   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382       "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
385
386   if (!request){
387     /* Assuming that the trace is well formed, this mean the comm might have
388      * been caught by a MPI_test. Then just return.
389      */
390     return;
391   }
392
393 #ifdef HAVE_TRACING
394   int rank = request->comm != MPI_COMM_NULL
395       ? smpi_comm_rank(request->comm)
396       : -1;
397
398   MPI_Group group = smpi_comm_group(request->comm);
399   int src_traced = smpi_group_rank(group, request->src);
400   int dst_traced = smpi_group_rank(group, request->dst);
401   int is_wait_for_receive = request->recv;
402   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403   extra->type = TRACING_WAIT;
404   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
405 #endif
406   smpi_mpi_wait(&request, &status);
407 #ifdef HAVE_TRACING
408   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409   if (is_wait_for_receive) {
410     TRACE_smpi_recv(rank, src_traced, dst_traced);
411   }
412 #endif
413
414   log_timed_action (action, clock);
415 }
416
417 static void action_waitall(const char *const *action){
418   double clock = smpi_process_simulated_elapsed();
419   int count_requests=0;
420   unsigned int i=0;
421
422   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
423
424   if (count_requests>0) {
425     MPI_Request requests[count_requests];
426     MPI_Status status[count_requests];
427
428     /*  The reqq is an array of dynars. Its index corresponds to the rank.
429      Thus each rank saves its own requests to the array request. */
430     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
431
432   #ifdef HAVE_TRACING
433    //save information from requests
434
435    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438    for (i = 0; i < count_requests; i++) {
439     if(requests[i]){
440       int *asrc = xbt_new(int, 1);
441       int *adst = xbt_new(int, 1);
442       int *arecv = xbt_new(int, 1);
443       *asrc = requests[i]->src;
444       *adst = requests[i]->dst;
445       *arecv = requests[i]->recv;
446       xbt_dynar_insert_at(srcs, i, asrc);
447       xbt_dynar_insert_at(dsts, i, adst);
448       xbt_dynar_insert_at(recvs, i, arecv);
449       xbt_free(asrc);
450       xbt_free(adst);
451       xbt_free(arecv);
452     }else {
453       int *t = xbt_new(int, 1);
454       xbt_dynar_insert_at(srcs, i, t);
455       xbt_dynar_insert_at(dsts, i, t);
456       xbt_dynar_insert_at(recvs, i, t);
457       xbt_free(t);
458     }
459    }
460    int rank_traced = smpi_process_index();
461    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462    extra->type = TRACING_WAITALL;
463    extra->send_size=count_requests;
464    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
465  #endif
466
467     smpi_mpi_waitall(count_requests, requests, status);
468
469   #ifdef HAVE_TRACING
470    for (i = 0; i < count_requests; i++) {
471     int src_traced, dst_traced, is_wait_for_receive;
472     xbt_dynar_get_cpy(srcs, i, &src_traced);
473     xbt_dynar_get_cpy(dsts, i, &dst_traced);
474     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475     if (is_wait_for_receive) {
476       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
477     }
478    }
479    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
480    //clean-up of dynars
481    xbt_dynar_free(&srcs);
482    xbt_dynar_free(&dsts);
483    xbt_dynar_free(&recvs);
484   #endif
485
486    xbt_dynar_free_container(&(reqq[smpi_process_index()]));
487   }
488   log_timed_action (action, clock);
489 }
490
491 static void action_barrier(const char *const *action){
492   double clock = smpi_process_simulated_elapsed();
493 #ifdef HAVE_TRACING
494   int rank = smpi_process_index();
495   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496   extra->type = TRACING_BARRIER;
497   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
498 #endif
499   mpi_coll_barrier_fun(MPI_COMM_WORLD);
500 #ifdef HAVE_TRACING
501   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
502 #endif
503
504   log_timed_action (action, clock);
505 }
506
507
508 static void action_bcast(const char *const *action)
509 {
510   double size = parse_double(action[2]);
511   double clock = smpi_process_simulated_elapsed();
512   int root=0;
513   /*
514    * Initialize MPI_CURRENT_TYPE in order to decrease
515    * the number of the checks
516    * */
517   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
518
519   if(action[3]) {
520     root= atoi(action[3]);
521     if(action[4]) {
522       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
523     }
524   }
525
526 #ifdef HAVE_TRACING
527   int rank = smpi_process_index();
528   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
529
530   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
531   extra->type = TRACING_BCAST;
532   extra->send_size = size;
533   extra->root = root_traced;
534   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
535   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
536
537 #endif
538     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
539   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
540 #ifdef HAVE_TRACING
541   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
542 #endif
543   log_timed_action (action, clock);
544 }
545
546 static void action_reduce(const char *const *action)
547 {
548   double comm_size = parse_double(action[2]);
549   double comp_size = parse_double(action[3]);
550   double clock = smpi_process_simulated_elapsed();
551   int root=0;
552   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
553
554   if(action[4]) {
555     root= atoi(action[4]);
556     if(action[5]) {
557       MPI_CURRENT_TYPE=decode_datatype(action[5]);
558     }
559   }
560   
561   
562
563 #ifdef HAVE_TRACING
564   int rank = smpi_process_index();
565   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
566   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
567   extra->type = TRACING_REDUCE;
568   extra->send_size = comm_size;
569   extra->comp_size = comp_size;
570   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
571   extra->root = root_traced;
572
573   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
574 #endif
575     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
576     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
577    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
578    smpi_execute_flops(comp_size);
579 #ifdef HAVE_TRACING
580   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
581 #endif
582   log_timed_action (action, clock);
583 }
584
585 static void action_allReduce(const char *const *action) {
586   double comm_size = parse_double(action[2]);
587   double comp_size = parse_double(action[3]);
588
589   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
590   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
591
592   double clock = smpi_process_simulated_elapsed();
593 #ifdef HAVE_TRACING
594   int rank = smpi_process_index();
595   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
596   extra->type = TRACING_ALLREDUCE;
597   extra->send_size = comm_size;
598   extra->comp_size = comp_size;
599   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
600
601   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
602 #endif
603     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
606   smpi_execute_flops(comp_size);
607 #ifdef HAVE_TRACING
608   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
609 #endif
610   log_timed_action (action, clock);
611 }
612
613 static void action_allToAll(const char *const *action) {
614   double clock = smpi_process_simulated_elapsed();
615   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
616   int send_size = parse_double(action[2]);
617   int recv_size = parse_double(action[3]);
618   MPI_Datatype MPI_CURRENT_TYPE2;
619
620   if(action[4]) {
621     MPI_CURRENT_TYPE=decode_datatype(action[4]);
622     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
623   }
624   else {
625     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
626     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
627   }
628   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
629   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
630
631 #ifdef HAVE_TRACING
632   int rank = smpi_process_index();
633   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
634   extra->type = TRACING_ALLTOALL;
635   extra->send_size = send_size;
636   extra->recv_size = recv_size;
637   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
638   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
639
640   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
641 #endif
642
643   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
644
645 #ifdef HAVE_TRACING
646   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
647 #endif
648   log_timed_action (action, clock);
649
650 }
651
652
653 static void action_gather(const char *const *action) {
654   /*
655  The structure of the gather action for the rank 0 (total 4 processes) 
656  is the following:   
657  0 gather 68 68 0 0 0
658
659   where: 
660   1) 68 is the sendcounts
661   2) 68 is the recvcounts
662   3) 0 is the root node
663   4) 0 is the send datatype id, see decode_datatype()
664   5) 0 is the recv datatype id, see decode_datatype()
665   */
666   double clock = smpi_process_simulated_elapsed();
667   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
668   int send_size = parse_double(action[2]);
669   int recv_size = parse_double(action[3]);
670   MPI_Datatype MPI_CURRENT_TYPE2;
671   if(action[5]) {
672     MPI_CURRENT_TYPE=decode_datatype(action[5]);
673     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
674   } else {
675     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
676     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
677   }
678   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
679   void *recv = NULL;
680
681   int root=atoi(action[4]);
682   int rank = smpi_comm_rank(MPI_COMM_WORLD);
683
684   if(rank==root)
685     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
686
687 #ifdef HAVE_TRACING
688   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
689   extra->type = TRACING_GATHER;
690   extra->send_size = send_size;
691   extra->recv_size = recv_size;
692   extra->root = root;
693   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
694   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
695
696   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
697 #endif
698   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
699                 recv, recv_size, MPI_CURRENT_TYPE2,
700                 root, MPI_COMM_WORLD);
701
702 #ifdef HAVE_TRACING
703   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
704 #endif
705   log_timed_action (action, clock);
706
707 }
708
709
710
711 static void action_gatherv(const char *const *action) {
712   /*
713  The structure of the gatherv action for the rank 0 (total 4 processes)
714  is the following:
715  0 gather 68 68 10 10 10 0 0 0
716
717   where:
718   1) 68 is the sendcount
719   2) 68 10 10 10 is the recvcounts
720   3) 0 is the root node
721   4) 0 is the send datatype id, see decode_datatype()
722   5) 0 is the recv datatype id, see decode_datatype()
723   */
724   double clock = smpi_process_simulated_elapsed();
725   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
726   int send_size = parse_double(action[2]);
727   int *disps = xbt_new0(int, comm_size);
728   int *recvcounts = xbt_new0(int, comm_size);
729   int i=0,recv_sum=0;
730
731   MPI_Datatype MPI_CURRENT_TYPE2;
732   if(action[4+comm_size]) {
733     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
734     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
735   } else {
736     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
737     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
738   }
739   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
740   void *recv = NULL;
741   for(i=0;i<comm_size;i++) {
742     recvcounts[i] = atoi(action[i+3]);
743     recv_sum=recv_sum+recvcounts[i];
744     disps[i] = 0;
745   }
746
747   int root=atoi(action[3+comm_size]);
748   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
749
750   if(rank==root)
751     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
752
753 #ifdef HAVE_TRACING
754   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
755   extra->type = TRACING_GATHERV;
756   extra->send_size = send_size;
757   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
758   for(i=0; i< comm_size; i++)//copy data to avoid bad free
759     extra->recvcounts[i] = recvcounts[i];
760   extra->root = root;
761   extra->num_processes = comm_size;
762   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
763   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
764
765   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
766 #endif
767 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
768                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
769                 root, MPI_COMM_WORLD);
770
771 #ifdef HAVE_TRACING
772   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
773 #endif
774
775   log_timed_action (action, clock);
776   xbt_free(recvcounts);
777   xbt_free(disps);
778 }
779
780 static void action_reducescatter(const char *const *action) {
781
782     /*
783  The structure of the reducescatter action for the rank 0 (total 4 processes) 
784  is the following:   
785 0 reduceScatter 275427 275427 275427 204020 11346849 0
786
787   where: 
788   1) The first four values after the name of the action declare the recvcounts array
789   2) The value 11346849 is the amount of instructions
790   3) The last value corresponds to the datatype, see decode_datatype().
791
792   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
793
794    */
795
796   double clock = smpi_process_simulated_elapsed();
797   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
798   int comp_size = parse_double(action[2+comm_size]);
799   int *recvcounts = xbt_new0(int, comm_size);  
800   int *disps = xbt_new0(int, comm_size);  
801   int i=0;
802   int rank = smpi_process_index();
803   int size = 0;
804   if(action[3+comm_size])
805     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
806   else
807     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
808
809   for(i=0;i<comm_size;i++) {
810     recvcounts[i] = atoi(action[i+2]);
811     disps[i] = 0;
812     size+=recvcounts[i];
813   }
814
815 #ifdef HAVE_TRACING
816   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817   extra->type = TRACING_REDUCE_SCATTER;
818   extra->send_size = 0;
819   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
820   for(i=0; i< comm_size; i++)//copy data to avoid bad free
821     extra->recvcounts[i] = recvcounts[i];
822   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
823   extra->comp_size = comp_size;
824   extra->num_processes = comm_size;
825
826   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
827 #endif
828   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
829    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
830    
831    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
832        MPI_COMM_WORLD);
833    smpi_execute_flops(comp_size);
834
835
836 #ifdef HAVE_TRACING
837   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
838 #endif
839   xbt_free(recvcounts);
840   xbt_free(disps);
841   log_timed_action (action, clock);
842 }
843
844
845 static void action_allgatherv(const char *const *action) {
846
847   /*
848  The structure of the allgatherv action for the rank 0 (total 4 processes) 
849  is the following:   
850 0 allGatherV 275427 275427 275427 275427 204020
851
852   where: 
853   1) 275427 is the sendcount
854   2) The next four elements declare the recvcounts array
855   3) No more values mean that the datatype for sent and receive buffer
856   is the default one, see decode_datatype().
857
858    */
859
860   double clock = smpi_process_simulated_elapsed();
861
862   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
863   int i=0;
864   int sendcount=atoi(action[2]);
865   int *recvcounts = xbt_new0(int, comm_size);  
866   int *disps = xbt_new0(int, comm_size);  
867   int recv_sum=0;  
868   MPI_Datatype MPI_CURRENT_TYPE2;
869
870   if(action[3+comm_size]) {
871     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
872     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
873   } else {
874     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
875     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
876   }
877   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
878
879   for(i=0;i<comm_size;i++) {
880     recvcounts[i] = atoi(action[i+3]);
881     recv_sum=recv_sum+recvcounts[i];
882   }
883   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
884
885 #ifdef HAVE_TRACING
886   int rank = smpi_process_index();
887   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
888   extra->type = TRACING_ALLGATHERV;
889   extra->send_size = sendcount;
890   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
891   for(i=0; i< comm_size; i++)//copy data to avoid bad free
892     extra->recvcounts[i] = recvcounts[i];
893   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
894   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
895   extra->num_processes = comm_size;
896
897   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
898 #endif
899
900   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
901
902 #ifdef HAVE_TRACING
903   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
904 #endif
905
906   log_timed_action (action, clock);
907   xbt_free(recvcounts);
908   xbt_free(disps);
909 }
910
911
912 static void action_allToAllv(const char *const *action) {
913   /*
914  The structure of the allToAllV action for the rank 0 (total 4 processes) 
915  is the following:   
916   0 allToAllV 100 1 7 10 12 100 1 70 10 5
917
918   where: 
919   1) 100 is the size of the send buffer *sizeof(int),
920   2) 1 7 10 12 is the sendcounts array
921   3) 100*sizeof(int) is the size of the receiver buffer
922   4)  1 70 10 5 is the recvcounts array
923
924    */
925
926
927   double clock = smpi_process_simulated_elapsed();
928
929   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
930   int send_buf_size=0,recv_buf_size=0,i=0;
931   int *sendcounts = xbt_new0(int, comm_size);  
932   int *recvcounts = xbt_new0(int, comm_size);  
933   int *senddisps = xbt_new0(int, comm_size);  
934   int *recvdisps = xbt_new0(int, comm_size);  
935
936   MPI_Datatype MPI_CURRENT_TYPE2;
937
938   send_buf_size=parse_double(action[2]);
939   recv_buf_size=parse_double(action[3+comm_size]);
940   if(action[4+2*comm_size]) {
941     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
942     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
943   }
944   else {
945       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
946       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
947   }
948
949   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
950   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
951
952   for(i=0;i<comm_size;i++) {
953     sendcounts[i] = atoi(action[i+3]);
954     recvcounts[i] = atoi(action[i+4+comm_size]);
955   }
956
957
958 #ifdef HAVE_TRACING
959   int rank = smpi_process_index();
960   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
961   extra->type = TRACING_ALLTOALLV;
962   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
963   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
964   extra->num_processes = comm_size;
965
966   for(i=0; i< comm_size; i++){//copy data to avoid bad free
967     extra->send_size += sendcounts[i];
968     extra->sendcounts[i] = sendcounts[i];
969     extra->recv_size += recvcounts[i];
970     extra->recvcounts[i] = recvcounts[i];
971   }
972   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
973   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
974
975   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
976 #endif
977   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
978                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
979                                MPI_COMM_WORLD);
980 #ifdef HAVE_TRACING
981   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
982 #endif
983
984   log_timed_action (action, clock);
985   xbt_free(sendcounts);
986   xbt_free(recvcounts);
987   xbt_free(senddisps);
988   xbt_free(recvdisps);
989 }
990
991 void smpi_replay_init(int *argc, char***argv){
992   smpi_process_init(argc, argv);
993   smpi_process_mark_as_initialized();
994   smpi_process_set_replaying(1);
995 #ifdef HAVE_TRACING
996   int rank = smpi_process_index();
997   TRACE_smpi_init(rank);
998   TRACE_smpi_computing_init(rank);
999   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1000   extra->type = TRACING_INIT;
1001   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1002   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1003 #endif
1004
1005   if (!smpi_process_index()){
1006     _xbt_replay_action_init();
1007     xbt_replay_action_register("init",       action_init);
1008     xbt_replay_action_register("finalize",   action_finalize);
1009     xbt_replay_action_register("comm_size",  action_comm_size);
1010     xbt_replay_action_register("comm_split", action_comm_split);
1011     xbt_replay_action_register("comm_dup",   action_comm_dup);
1012     xbt_replay_action_register("send",       action_send);
1013     xbt_replay_action_register("Isend",      action_Isend);
1014     xbt_replay_action_register("recv",       action_recv);
1015     xbt_replay_action_register("Irecv",      action_Irecv);
1016     xbt_replay_action_register("test",       action_test);
1017     xbt_replay_action_register("wait",       action_wait);
1018     xbt_replay_action_register("waitAll",    action_waitall);
1019     xbt_replay_action_register("barrier",    action_barrier);
1020     xbt_replay_action_register("bcast",      action_bcast);
1021     xbt_replay_action_register("reduce",     action_reduce);
1022     xbt_replay_action_register("allReduce",  action_allReduce);
1023     xbt_replay_action_register("allToAll",   action_allToAll);
1024     xbt_replay_action_register("allToAllV",  action_allToAllv);
1025     xbt_replay_action_register("gather",  action_gather);
1026     xbt_replay_action_register("gatherV",  action_gatherv);
1027     xbt_replay_action_register("allGatherV",  action_allgatherv);
1028     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1029     xbt_replay_action_register("compute",    action_compute);
1030   }
1031   
1032   //if we have a delayed start, sleep here.
1033   if(*argc>2){
1034     char *endptr;
1035     double value = strtod((*argv)[2], &endptr);
1036     if (*endptr != '\0')
1037       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1038     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1039     smpi_execute_flops(value);
1040   }
1041   xbt_replay_action_runner(*argc, *argv);
1042 }
1043
1044 int smpi_replay_finalize(){
1045   double sim_time= 1.;
1046   /* One active process will stop. Decrease the counter*/
1047   XBT_DEBUG("There are %lu elements in reqq[*]",
1048             xbt_dynar_length(reqq[smpi_process_index()]));
1049   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1050     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1051     MPI_Request requests[count_requests];
1052     MPI_Status status[count_requests];
1053     unsigned int i;
1054
1055     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1056     smpi_mpi_waitall(count_requests, requests, status);
1057     active_processes--;
1058   } else {
1059     active_processes--;
1060   }
1061
1062   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1063
1064   if(!active_processes){
1065     /* Last process alive speaking */
1066     /* end the simulated timer */
1067     sim_time = smpi_process_simulated_elapsed();
1068     XBT_INFO("Simulation time %f", sim_time);
1069     _xbt_replay_action_exit();
1070     xbt_free(sendbuffer);
1071     xbt_free(recvbuffer);
1072     xbt_free(reqq);
1073     reqq = NULL;
1074   }
1075   mpi_coll_barrier_fun(MPI_COMM_WORLD);
1076 #ifdef HAVE_TRACING
1077   int rank = smpi_process_index();
1078   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1079   extra->type = TRACING_FINALIZE;
1080   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1081 #endif
1082   smpi_process_finalize();
1083 #ifdef HAVE_TRACING
1084   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1085   TRACE_smpi_finalize(smpi_process_index());
1086 #endif
1087   smpi_process_destroy();
1088   return MPI_SUCCESS;
1089 }