Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
685f7f3b426feb44fe11885ce4f47bb428a1adb6
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   xbt_assert(request != NULL, "found null request in reqq");
355
356 #ifdef HAVE_TRACING
357   int rank = smpi_process_index();
358   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359   extra->type=TRACING_TEST;
360   TRACE_smpi_testing_in(rank, extra);
361 #endif
362   flag = smpi_mpi_test(&request, &status);
363   XBT_DEBUG("MPI_Test result: %d", flag);
364   /* push back request in dynar to be caught by a subsequent wait. if the test
365    * did succeed, the request is now NULL.
366    */
367    if(request)
368   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
369
370 #ifdef HAVE_TRACING
371   TRACE_smpi_testing_out(rank);
372 #endif
373
374   log_timed_action (action, clock);
375 }
376
377 static void action_wait(const char *const *action){
378   double clock = smpi_process_simulated_elapsed();
379   MPI_Request request;
380   MPI_Status status;
381
382   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
383       "action wait not preceded by any irecv or isend: %s",
384       xbt_str_join_array(action," "));
385   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
386
387   if (!request){
388     /* Assuming that the trace is well formed, this mean the comm might have
389      * been caught by a MPI_test. Then just return.
390      */
391     return;
392   }
393
394 #ifdef HAVE_TRACING
395   int rank = request->comm != MPI_COMM_NULL
396       ? smpi_comm_rank(request->comm)
397       : -1;
398
399   MPI_Group group = smpi_comm_group(request->comm);
400   int src_traced = smpi_group_rank(group, request->src);
401   int dst_traced = smpi_group_rank(group, request->dst);
402   int is_wait_for_receive = request->recv;
403   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
404   extra->type = TRACING_WAIT;
405   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
406 #endif
407   smpi_mpi_wait(&request, &status);
408 #ifdef HAVE_TRACING
409   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
410   if (is_wait_for_receive) {
411     TRACE_smpi_recv(rank, src_traced, dst_traced);
412   }
413 #endif
414
415   log_timed_action (action, clock);
416 }
417
418 static void action_waitall(const char *const *action){
419   double clock = smpi_process_simulated_elapsed();
420   int count_requests=0;
421   unsigned int i=0;
422
423   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
424
425   if (count_requests>0) {
426     MPI_Request requests[count_requests];
427     MPI_Status status[count_requests];
428
429     /*  The reqq is an array of dynars. Its index corresponds to the rank.
430      Thus each rank saves its own requests to the array request. */
431     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
432
433   #ifdef HAVE_TRACING
434    //save information from requests
435
436    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
437    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
438    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
439    for (i = 0; i < count_requests; i++) {
440     if(requests[i]){
441       int *asrc = xbt_new(int, 1);
442       int *adst = xbt_new(int, 1);
443       int *arecv = xbt_new(int, 1);
444       *asrc = requests[i]->src;
445       *adst = requests[i]->dst;
446       *arecv = requests[i]->recv;
447       xbt_dynar_insert_at(srcs, i, asrc);
448       xbt_dynar_insert_at(dsts, i, adst);
449       xbt_dynar_insert_at(recvs, i, arecv);
450       xbt_free(asrc);
451       xbt_free(adst);
452       xbt_free(arecv);
453     }else {
454       int *t = xbt_new(int, 1);
455       xbt_dynar_insert_at(srcs, i, t);
456       xbt_dynar_insert_at(dsts, i, t);
457       xbt_dynar_insert_at(recvs, i, t);
458       xbt_free(t);
459     }
460    }
461    int rank_traced = smpi_process_index();
462    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
463    extra->type = TRACING_WAITALL;
464    extra->send_size=count_requests;
465    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
466  #endif
467
468     smpi_mpi_waitall(count_requests, requests, status);
469
470   #ifdef HAVE_TRACING
471    for (i = 0; i < count_requests; i++) {
472     int src_traced, dst_traced, is_wait_for_receive;
473     xbt_dynar_get_cpy(srcs, i, &src_traced);
474     xbt_dynar_get_cpy(dsts, i, &dst_traced);
475     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
476     if (is_wait_for_receive) {
477       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
478     }
479    }
480    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
481    //clean-up of dynars
482    xbt_dynar_free(&srcs);
483    xbt_dynar_free(&dsts);
484    xbt_dynar_free(&recvs);
485   #endif
486
487    int freedrank=smpi_process_index();
488    xbt_dynar_free_container(&(reqq[freedrank]));
489    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
490   }
491   log_timed_action (action, clock);
492 }
493
494 static void action_barrier(const char *const *action){
495   double clock = smpi_process_simulated_elapsed();
496 #ifdef HAVE_TRACING
497   int rank = smpi_process_index();
498   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
499   extra->type = TRACING_BARRIER;
500   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
501 #endif
502   mpi_coll_barrier_fun(MPI_COMM_WORLD);
503 #ifdef HAVE_TRACING
504   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
505 #endif
506
507   log_timed_action (action, clock);
508 }
509
510
511 static void action_bcast(const char *const *action)
512 {
513   double size = parse_double(action[2]);
514   double clock = smpi_process_simulated_elapsed();
515   int root=0;
516   /*
517    * Initialize MPI_CURRENT_TYPE in order to decrease
518    * the number of the checks
519    * */
520   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
521
522   if(action[3]) {
523     root= atoi(action[3]);
524     if(action[4]) {
525       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
526     }
527   }
528
529 #ifdef HAVE_TRACING
530   int rank = smpi_process_index();
531   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
532
533   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
534   extra->type = TRACING_BCAST;
535   extra->send_size = size;
536   extra->root = root_traced;
537   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
538   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
539
540 #endif
541     void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
542   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
543 #ifdef HAVE_TRACING
544   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
545 #endif
546   log_timed_action (action, clock);
547 }
548
549 static void action_reduce(const char *const *action)
550 {
551   double comm_size = parse_double(action[2]);
552   double comp_size = parse_double(action[3]);
553   double clock = smpi_process_simulated_elapsed();
554   int root=0;
555   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
556
557   if(action[4]) {
558     root= atoi(action[4]);
559     if(action[5]) {
560       MPI_CURRENT_TYPE=decode_datatype(action[5]);
561     }
562   }
563   
564   
565
566 #ifdef HAVE_TRACING
567   int rank = smpi_process_index();
568   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
569   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
570   extra->type = TRACING_REDUCE;
571   extra->send_size = comm_size;
572   extra->comp_size = comp_size;
573   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
574   extra->root = root_traced;
575
576   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
577 #endif
578     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
579     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
580    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
581    smpi_execute_flops(comp_size);
582 #ifdef HAVE_TRACING
583   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
584 #endif
585   log_timed_action (action, clock);
586 }
587
588 static void action_allReduce(const char *const *action) {
589   double comm_size = parse_double(action[2]);
590   double comp_size = parse_double(action[3]);
591
592   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
593   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
594
595   double clock = smpi_process_simulated_elapsed();
596 #ifdef HAVE_TRACING
597   int rank = smpi_process_index();
598   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
599   extra->type = TRACING_ALLREDUCE;
600   extra->send_size = comm_size;
601   extra->comp_size = comp_size;
602   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
603
604   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
605 #endif
606     void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
607     void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
608   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
609   smpi_execute_flops(comp_size);
610 #ifdef HAVE_TRACING
611   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
612 #endif
613   log_timed_action (action, clock);
614 }
615
616 static void action_allToAll(const char *const *action) {
617   double clock = smpi_process_simulated_elapsed();
618   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
619   int send_size = parse_double(action[2]);
620   int recv_size = parse_double(action[3]);
621   MPI_Datatype MPI_CURRENT_TYPE2;
622
623   if(action[4]) {
624     MPI_CURRENT_TYPE=decode_datatype(action[4]);
625     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
626   }
627   else {
628     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
629     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
630   }
631   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
632   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
633
634 #ifdef HAVE_TRACING
635   int rank = smpi_process_index();
636   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
637   extra->type = TRACING_ALLTOALL;
638   extra->send_size = send_size;
639   extra->recv_size = recv_size;
640   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
641   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
642
643   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
644 #endif
645
646   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
647
648 #ifdef HAVE_TRACING
649   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
650 #endif
651   log_timed_action (action, clock);
652
653 }
654
655
656 static void action_gather(const char *const *action) {
657   /*
658  The structure of the gather action for the rank 0 (total 4 processes) 
659  is the following:   
660  0 gather 68 68 0 0 0
661
662   where: 
663   1) 68 is the sendcounts
664   2) 68 is the recvcounts
665   3) 0 is the root node
666   4) 0 is the send datatype id, see decode_datatype()
667   5) 0 is the recv datatype id, see decode_datatype()
668   */
669   double clock = smpi_process_simulated_elapsed();
670   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
671   int send_size = parse_double(action[2]);
672   int recv_size = parse_double(action[3]);
673   MPI_Datatype MPI_CURRENT_TYPE2;
674   if(action[5]) {
675     MPI_CURRENT_TYPE=decode_datatype(action[5]);
676     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
677   } else {
678     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
679     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
680   }
681   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
682   void *recv = NULL;
683
684   int root=atoi(action[4]);
685   int rank = smpi_comm_rank(MPI_COMM_WORLD);
686
687   if(rank==root)
688     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
689
690 #ifdef HAVE_TRACING
691   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
692   extra->type = TRACING_GATHER;
693   extra->send_size = send_size;
694   extra->recv_size = recv_size;
695   extra->root = root;
696   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
697   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
698
699   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
700 #endif
701   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
702                 recv, recv_size, MPI_CURRENT_TYPE2,
703                 root, MPI_COMM_WORLD);
704
705 #ifdef HAVE_TRACING
706   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
707 #endif
708   log_timed_action (action, clock);
709
710 }
711
712
713
714 static void action_gatherv(const char *const *action) {
715   /*
716  The structure of the gatherv action for the rank 0 (total 4 processes)
717  is the following:
718  0 gather 68 68 10 10 10 0 0 0
719
720   where:
721   1) 68 is the sendcount
722   2) 68 10 10 10 is the recvcounts
723   3) 0 is the root node
724   4) 0 is the send datatype id, see decode_datatype()
725   5) 0 is the recv datatype id, see decode_datatype()
726   */
727   double clock = smpi_process_simulated_elapsed();
728   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
729   int send_size = parse_double(action[2]);
730   int *disps = xbt_new0(int, comm_size);
731   int *recvcounts = xbt_new0(int, comm_size);
732   int i=0,recv_sum=0;
733
734   MPI_Datatype MPI_CURRENT_TYPE2;
735   if(action[4+comm_size]) {
736     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
737     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
738   } else {
739     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
740     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
741   }
742   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
743   void *recv = NULL;
744   for(i=0;i<comm_size;i++) {
745     recvcounts[i] = atoi(action[i+3]);
746     recv_sum=recv_sum+recvcounts[i];
747     disps[i] = 0;
748   }
749
750   int root=atoi(action[3+comm_size]);
751   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
752
753   if(rank==root)
754     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
755
756 #ifdef HAVE_TRACING
757   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
758   extra->type = TRACING_GATHERV;
759   extra->send_size = send_size;
760   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
761   for(i=0; i< comm_size; i++)//copy data to avoid bad free
762     extra->recvcounts[i] = recvcounts[i];
763   extra->root = root;
764   extra->num_processes = comm_size;
765   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
766   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
767
768   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
769 #endif
770 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
771                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
772                 root, MPI_COMM_WORLD);
773
774 #ifdef HAVE_TRACING
775   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
776 #endif
777
778   log_timed_action (action, clock);
779   xbt_free(recvcounts);
780   xbt_free(disps);
781 }
782
783 static void action_reducescatter(const char *const *action) {
784
785     /*
786  The structure of the reducescatter action for the rank 0 (total 4 processes) 
787  is the following:   
788 0 reduceScatter 275427 275427 275427 204020 11346849 0
789
790   where: 
791   1) The first four values after the name of the action declare the recvcounts array
792   2) The value 11346849 is the amount of instructions
793   3) The last value corresponds to the datatype, see decode_datatype().
794
795   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
796
797    */
798
799   double clock = smpi_process_simulated_elapsed();
800   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
801   int comp_size = parse_double(action[2+comm_size]);
802   int *recvcounts = xbt_new0(int, comm_size);  
803   int *disps = xbt_new0(int, comm_size);  
804   int i=0;
805   int rank = smpi_process_index();
806   int size = 0;
807   if(action[3+comm_size])
808     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
809   else
810     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
811
812   for(i=0;i<comm_size;i++) {
813     recvcounts[i] = atoi(action[i+2]);
814     disps[i] = 0;
815     size+=recvcounts[i];
816   }
817
818 #ifdef HAVE_TRACING
819   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
820   extra->type = TRACING_REDUCE_SCATTER;
821   extra->send_size = 0;
822   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
823   for(i=0; i< comm_size; i++)//copy data to avoid bad free
824     extra->recvcounts[i] = recvcounts[i];
825   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
826   extra->comp_size = comp_size;
827   extra->num_processes = comm_size;
828
829   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
830 #endif
831   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
832    void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
833    
834    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
835        MPI_COMM_WORLD);
836    smpi_execute_flops(comp_size);
837
838
839 #ifdef HAVE_TRACING
840   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
841 #endif
842   xbt_free(recvcounts);
843   xbt_free(disps);
844   log_timed_action (action, clock);
845 }
846
847
848 static void action_allgatherv(const char *const *action) {
849
850   /*
851  The structure of the allgatherv action for the rank 0 (total 4 processes) 
852  is the following:   
853 0 allGatherV 275427 275427 275427 275427 204020
854
855   where: 
856   1) 275427 is the sendcount
857   2) The next four elements declare the recvcounts array
858   3) No more values mean that the datatype for sent and receive buffer
859   is the default one, see decode_datatype().
860
861    */
862
863   double clock = smpi_process_simulated_elapsed();
864
865   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
866   int i=0;
867   int sendcount=atoi(action[2]);
868   int *recvcounts = xbt_new0(int, comm_size);  
869   int *disps = xbt_new0(int, comm_size);  
870   int recv_sum=0;  
871   MPI_Datatype MPI_CURRENT_TYPE2;
872
873   if(action[3+comm_size]) {
874     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
875     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
876   } else {
877     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
878     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
879   }
880   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
881
882   for(i=0;i<comm_size;i++) {
883     recvcounts[i] = atoi(action[i+3]);
884     recv_sum=recv_sum+recvcounts[i];
885   }
886   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
887
888 #ifdef HAVE_TRACING
889   int rank = smpi_process_index();
890   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
891   extra->type = TRACING_ALLGATHERV;
892   extra->send_size = sendcount;
893   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
894   for(i=0; i< comm_size; i++)//copy data to avoid bad free
895     extra->recvcounts[i] = recvcounts[i];
896   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
897   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
898   extra->num_processes = comm_size;
899
900   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
901 #endif
902
903   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
904
905 #ifdef HAVE_TRACING
906   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
907 #endif
908
909   log_timed_action (action, clock);
910   xbt_free(recvcounts);
911   xbt_free(disps);
912 }
913
914
915 static void action_allToAllv(const char *const *action) {
916   /*
917  The structure of the allToAllV action for the rank 0 (total 4 processes) 
918  is the following:   
919   0 allToAllV 100 1 7 10 12 100 1 70 10 5
920
921   where: 
922   1) 100 is the size of the send buffer *sizeof(int),
923   2) 1 7 10 12 is the sendcounts array
924   3) 100*sizeof(int) is the size of the receiver buffer
925   4)  1 70 10 5 is the recvcounts array
926
927    */
928
929
930   double clock = smpi_process_simulated_elapsed();
931
932   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
933   int send_buf_size=0,recv_buf_size=0,i=0;
934   int *sendcounts = xbt_new0(int, comm_size);  
935   int *recvcounts = xbt_new0(int, comm_size);  
936   int *senddisps = xbt_new0(int, comm_size);  
937   int *recvdisps = xbt_new0(int, comm_size);  
938
939   MPI_Datatype MPI_CURRENT_TYPE2;
940
941   send_buf_size=parse_double(action[2]);
942   recv_buf_size=parse_double(action[3+comm_size]);
943   if(action[4+2*comm_size]) {
944     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
945     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
946   }
947   else {
948       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
949       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
950   }
951
952   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
953   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
954
955   for(i=0;i<comm_size;i++) {
956     sendcounts[i] = atoi(action[i+3]);
957     recvcounts[i] = atoi(action[i+4+comm_size]);
958   }
959
960
961 #ifdef HAVE_TRACING
962   int rank = smpi_process_index();
963   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
964   extra->type = TRACING_ALLTOALLV;
965   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
966   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
967   extra->num_processes = comm_size;
968
969   for(i=0; i< comm_size; i++){//copy data to avoid bad free
970     extra->send_size += sendcounts[i];
971     extra->sendcounts[i] = sendcounts[i];
972     extra->recv_size += recvcounts[i];
973     extra->recvcounts[i] = recvcounts[i];
974   }
975   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
976   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
977
978   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
979 #endif
980   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
981                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
982                                MPI_COMM_WORLD);
983 #ifdef HAVE_TRACING
984   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
985 #endif
986
987   log_timed_action (action, clock);
988   xbt_free(sendcounts);
989   xbt_free(recvcounts);
990   xbt_free(senddisps);
991   xbt_free(recvdisps);
992 }
993
994 void smpi_replay_init(int *argc, char***argv){
995   smpi_process_init(argc, argv);
996   smpi_process_mark_as_initialized();
997   smpi_process_set_replaying(1);
998 #ifdef HAVE_TRACING
999   int rank = smpi_process_index();
1000   TRACE_smpi_init(rank);
1001   TRACE_smpi_computing_init(rank);
1002   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1003   extra->type = TRACING_INIT;
1004   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1005   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1006 #endif
1007
1008   if (!smpi_process_index()){
1009     _xbt_replay_action_init();
1010     xbt_replay_action_register("init",       action_init);
1011     xbt_replay_action_register("finalize",   action_finalize);
1012     xbt_replay_action_register("comm_size",  action_comm_size);
1013     xbt_replay_action_register("comm_split", action_comm_split);
1014     xbt_replay_action_register("comm_dup",   action_comm_dup);
1015     xbt_replay_action_register("send",       action_send);
1016     xbt_replay_action_register("Isend",      action_Isend);
1017     xbt_replay_action_register("recv",       action_recv);
1018     xbt_replay_action_register("Irecv",      action_Irecv);
1019     xbt_replay_action_register("test",       action_test);
1020     xbt_replay_action_register("wait",       action_wait);
1021     xbt_replay_action_register("waitAll",    action_waitall);
1022     xbt_replay_action_register("barrier",    action_barrier);
1023     xbt_replay_action_register("bcast",      action_bcast);
1024     xbt_replay_action_register("reduce",     action_reduce);
1025     xbt_replay_action_register("allReduce",  action_allReduce);
1026     xbt_replay_action_register("allToAll",   action_allToAll);
1027     xbt_replay_action_register("allToAllV",  action_allToAllv);
1028     xbt_replay_action_register("gather",  action_gather);
1029     xbt_replay_action_register("gatherV",  action_gatherv);
1030     xbt_replay_action_register("allGatherV",  action_allgatherv);
1031     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1032     xbt_replay_action_register("compute",    action_compute);
1033   }
1034   
1035   //if we have a delayed start, sleep here.
1036   if(*argc>2){
1037     char *endptr;
1038     double value = strtod((*argv)[2], &endptr);
1039     if (*endptr != '\0')
1040       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1041     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1042     smpi_execute_flops(value);
1043   }
1044   xbt_replay_action_runner(*argc, *argv);
1045 }
1046
1047 int smpi_replay_finalize(){
1048   double sim_time= 1.;
1049   /* One active process will stop. Decrease the counter*/
1050   XBT_DEBUG("There are %lu elements in reqq[*]",
1051             xbt_dynar_length(reqq[smpi_process_index()]));
1052   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1053     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1054     MPI_Request requests[count_requests];
1055     MPI_Status status[count_requests];
1056     unsigned int i;
1057
1058     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1059     smpi_mpi_waitall(count_requests, requests, status);
1060     active_processes--;
1061   } else {
1062     active_processes--;
1063   }
1064
1065   if(!active_processes){
1066     /* Last process alive speaking */
1067     /* end the simulated timer */
1068     sim_time = smpi_process_simulated_elapsed();
1069   }
1070   
1071
1072   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1073
1074   if(!active_processes){
1075     XBT_INFO("Simulation time %f", sim_time);
1076     _xbt_replay_action_exit();
1077     xbt_free(sendbuffer);
1078     xbt_free(recvbuffer);
1079     xbt_free(reqq);
1080     reqq = NULL;
1081   }
1082   
1083
1084 #ifdef HAVE_TRACING
1085   int rank = smpi_process_index();
1086   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1087   extra->type = TRACING_FINALIZE;
1088   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1089 #endif
1090   smpi_process_finalize();
1091 #ifdef HAVE_TRACING
1092   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1093   TRACE_smpi_finalize(smpi_process_index());
1094 #endif
1095   smpi_process_destroy();
1096   return MPI_SUCCESS;
1097 }