Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI replay: make sure that the replay is inited even if rank 0 is not first for...
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     free(name);
33   }
34 }
35
36
37 static xbt_dynar_t get_reqq_self(){
38    char * key;
39    
40    asprintf(&key, "%d", smpi_process_index());
41    xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
42    free(key);
43  
44    return dynar_mpi_request;
45 }
46
47 static void set_reqq_self(xbt_dynar_t mpi_request){
48    char * key;
49    
50    asprintf(&key, "%d", smpi_process_index());
51    xbt_dict_set(reqq, key, mpi_request, free);
52    free(key);
53 }
54
55
56 //allocate a single buffer for all sends, growing it if needed
57 void* smpi_get_tmp_sendbuffer(int size){
58   if (!smpi_process_get_replaying())
59         return xbt_malloc(size);
60   if (sendbuffer_size<size){
61     sendbuffer=xbt_realloc(sendbuffer,size);
62     sendbuffer_size=size;
63   }
64   return sendbuffer;
65 }
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (!smpi_process_get_replaying())
69         return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=xbt_realloc(recvbuffer,size);
72     recvbuffer_size=size;
73   }
74   return sendbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (!smpi_process_get_replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   double value;
86   char *endptr;
87   value = strtod(string, &endptr);
88   if (*endptr != '\0')
89     THROWF(unknown_error, 0, "%s is not a double", string);
90   return value;
91 }
92
93 static MPI_Datatype decode_datatype(const char *const action)
94 {
95 // Declared datatypes,
96
97   switch(atoi(action))
98   {
99     case 0:
100       MPI_CURRENT_TYPE=MPI_DOUBLE;
101       break;
102     case 1:
103       MPI_CURRENT_TYPE=MPI_INT;
104       break;
105     case 2:
106       MPI_CURRENT_TYPE=MPI_CHAR;
107       break;
108     case 3:
109       MPI_CURRENT_TYPE=MPI_SHORT;
110       break;
111     case 4:
112       MPI_CURRENT_TYPE=MPI_LONG;
113       break;
114     case 5:
115       MPI_CURRENT_TYPE=MPI_FLOAT;
116       break;
117     case 6:
118       MPI_CURRENT_TYPE=MPI_BYTE;
119       break;
120     default:
121       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122
123   }
124    return MPI_CURRENT_TYPE;
125 }
126
127
128 const char* encode_datatype(MPI_Datatype datatype, int* known)
129 {
130
131   //default type for output is set to MPI_BYTE
132   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
133   if(known)*known=1;
134   if (datatype==MPI_BYTE){
135       return "";
136   }
137   if(datatype==MPI_DOUBLE)
138       return "0";
139   if(datatype==MPI_INT)
140       return "1";
141   if(datatype==MPI_CHAR)
142       return "2";
143   if(datatype==MPI_SHORT)
144       return "3";
145   if(datatype==MPI_LONG)
146     return "4";
147   if(datatype==MPI_FLOAT)
148       return "5";
149   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
150   if(known)*known=0;
151   // default - not implemented.
152   // do not warn here as we pass in this function even for other trace formats
153   return "-1";
154 }
155
156 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
157     int i=0;\
158     while(action[i]!=NULL)\
159      i++;\
160     if(i<mandatory+2)                                           \
161     THROWF(arg_error, 0, "%s replay failed.\n" \
162           "%d items were given on the line. First two should be process_id and action.  " \
163           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
164           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
165   }
166
167
168 static void action_init(const char *const *action)
169 {
170   XBT_DEBUG("Initialize the counters");
171   CHECK_ACTION_PARAMS(action, 0, 1);
172   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
173   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
174
175   /* start a simulated timer */
176   smpi_process_simulated_start();
177   /*initialize the number of active processes */
178   active_processes = smpi_process_count();
179
180   if (!reqq) {
181     reqq = xbt_dict_new();
182   }
183
184   set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
185
186   /*
187     reqq=xbt_new0(xbt_dynar_t,active_processes);
188
189     for(i=0;i<active_processes;i++){
190       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
191     }
192   }
193   */
194 }
195
196 static void action_finalize(const char *const *action)
197 {
198 }
199
200 static void action_comm_size(const char *const *action)
201 {
202   double clock = smpi_process_simulated_elapsed();
203
204   communicator_size = parse_double(action[2]);
205   log_timed_action (action, clock);
206 }
207
208 static void action_comm_split(const char *const *action)
209 {
210   double clock = smpi_process_simulated_elapsed();
211
212   log_timed_action (action, clock);
213 }
214
215 static void action_comm_dup(const char *const *action)
216 {
217   double clock = smpi_process_simulated_elapsed();
218
219   log_timed_action (action, clock);
220 }
221
222 static void action_compute(const char *const *action)
223 {
224   CHECK_ACTION_PARAMS(action, 1, 0);
225   double clock = smpi_process_simulated_elapsed();
226   double flops= parse_double(action[2]);
227   int rank = smpi_process_index();
228   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
229   extra->type=TRACING_COMPUTING;
230   extra->comp_size=flops;
231   TRACE_smpi_computing_in(rank, extra);
232
233   smpi_execute_flops(flops);
234
235   TRACE_smpi_computing_out(rank);
236   log_timed_action (action, clock);
237 }
238
239 static void action_send(const char *const *action)
240 {
241   CHECK_ACTION_PARAMS(action, 2, 1);
242   int to = atoi(action[2]);
243   double size=parse_double(action[3]);
244   double clock = smpi_process_simulated_elapsed();
245
246   if(action[4]) {
247     MPI_CURRENT_TYPE=decode_datatype(action[4]);
248   } else {
249     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250   }
251
252   int rank = smpi_process_index();
253
254   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
255   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
256   extra->type = TRACING_SEND;
257   extra->send_size = size;
258   extra->src = rank;
259   extra->dst = dst_traced;
260   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
261   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
262   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
263
264   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
265
266   log_timed_action (action, clock);
267
268   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
269 }
270
271 static void action_Isend(const char *const *action)
272 {
273   CHECK_ACTION_PARAMS(action, 2, 1);
274   int to = atoi(action[2]);
275   double size=parse_double(action[3]);
276   double clock = smpi_process_simulated_elapsed();
277   MPI_Request request;
278
279   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
280   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
281
282   int rank = smpi_process_index();
283   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
284   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
285   extra->type = TRACING_ISEND;
286   extra->send_size = size;
287   extra->src = rank;
288   extra->dst = dst_traced;
289   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
290   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
291   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
292
293   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
294
295   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
296   request->send = 1;
297
298   xbt_dynar_push(get_reqq_self(),&request);
299
300   log_timed_action (action, clock);
301 }
302
303 static void action_recv(const char *const *action) {
304   CHECK_ACTION_PARAMS(action, 2, 1);
305   int from = atoi(action[2]);
306   double size=parse_double(action[3]);
307   double clock = smpi_process_simulated_elapsed();
308   MPI_Status status;
309
310   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
311   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
312
313   int rank = smpi_process_index();
314   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
315
316   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
317   extra->type = TRACING_RECV;
318   extra->send_size = size;
319   extra->src = src_traced;
320   extra->dst = rank;
321   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
322   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
323
324   //unknow size from the receiver pov
325   if(size==-1){
326       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
327       size=status.count;
328   }
329
330   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
331
332   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
333   TRACE_smpi_recv(rank, src_traced, rank);
334
335   log_timed_action (action, clock);
336 }
337
338 static void action_Irecv(const char *const *action)
339 {
340   CHECK_ACTION_PARAMS(action, 2, 1);
341   int from = atoi(action[2]);
342   double size=parse_double(action[3]);
343   double clock = smpi_process_simulated_elapsed();
344   MPI_Request request;
345
346   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
347   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
348
349   int rank = smpi_process_index();
350   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
351   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
352   extra->type = TRACING_IRECV;
353   extra->send_size = size;
354   extra->src = src_traced;
355   extra->dst = rank;
356   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
357   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
358   MPI_Status status;
359   //unknow size from the receiver pov
360   if(size==-1){
361       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
362       size=status.count;
363   }
364
365   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
366
367   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
368   request->recv = 1;
369   xbt_dynar_push(get_reqq_self(),&request);
370
371   log_timed_action (action, clock);
372 }
373
374 static void action_test(const char *const *action){
375   CHECK_ACTION_PARAMS(action, 0, 0);
376   double clock = smpi_process_simulated_elapsed();
377   MPI_Request request;
378   MPI_Status status;
379   int flag = TRUE;
380
381   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
382   //if request is null here, this may mean that a previous test has succeeded 
383   //Different times in traced application and replayed version may lead to this 
384   //In this case, ignore the extra calls.
385   if(request){
386           int rank = smpi_process_index();
387           instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
388           extra->type=TRACING_TEST;
389           TRACE_smpi_testing_in(rank, extra);
390
391           flag = smpi_mpi_test(&request, &status);
392
393           XBT_DEBUG("MPI_Test result: %d", flag);
394           /* push back request in dynar to be caught by a subsequent wait. if the test
395            * did succeed, the request is now NULL.
396            */
397           xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
398
399           TRACE_smpi_testing_out(rank);
400   }
401   log_timed_action (action, clock);
402 }
403
404 static void action_wait(const char *const *action){
405   CHECK_ACTION_PARAMS(action, 0, 0);
406   double clock = smpi_process_simulated_elapsed();
407   MPI_Request request;
408   MPI_Status status;
409
410   xbt_assert(xbt_dynar_length(get_reqq_self()),
411       "action wait not preceded by any irecv or isend: %s",
412       xbt_str_join_array(action," "));
413   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
414
415   if (!request){
416     /* Assuming that the trace is well formed, this mean the comm might have
417      * been caught by a MPI_test. Then just return.
418      */
419     return;
420   }
421
422   int rank = request->comm != MPI_COMM_NULL
423       ? smpi_comm_rank(request->comm)
424       : -1;
425
426   MPI_Group group = smpi_comm_group(request->comm);
427   int src_traced = smpi_group_rank(group, request->src);
428   int dst_traced = smpi_group_rank(group, request->dst);
429   int is_wait_for_receive = request->recv;
430   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431   extra->type = TRACING_WAIT;
432   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
433
434   smpi_mpi_wait(&request, &status);
435
436   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
437   if (is_wait_for_receive)
438     TRACE_smpi_recv(rank, src_traced, dst_traced);
439   log_timed_action (action, clock);
440 }
441
442 static void action_waitall(const char *const *action){
443   CHECK_ACTION_PARAMS(action, 0, 0);
444   double clock = smpi_process_simulated_elapsed();
445   int count_requests=0;
446   unsigned int i=0;
447
448   count_requests=xbt_dynar_length(get_reqq_self());
449
450   if (count_requests>0) {
451     MPI_Request requests[count_requests];
452     MPI_Status status[count_requests];
453
454     /*  The reqq is an array of dynars. Its index corresponds to the rank.
455      Thus each rank saves its own requests to the array request. */
456     xbt_dynar_foreach(get_reqq_self(),i,requests[i]); 
457
458    //save information from requests
459
460    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
461    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
462    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
463    for (i = 0; i < count_requests; i++) {
464     if(requests[i]){
465       int *asrc = xbt_new(int, 1);
466       int *adst = xbt_new(int, 1);
467       int *arecv = xbt_new(int, 1);
468       *asrc = requests[i]->src;
469       *adst = requests[i]->dst;
470       *arecv = requests[i]->recv;
471       xbt_dynar_insert_at(srcs, i, asrc);
472       xbt_dynar_insert_at(dsts, i, adst);
473       xbt_dynar_insert_at(recvs, i, arecv);
474       xbt_free(asrc);
475       xbt_free(adst);
476       xbt_free(arecv);
477     }else {
478       int *t = xbt_new(int, 1);
479       xbt_dynar_insert_at(srcs, i, t);
480       xbt_dynar_insert_at(dsts, i, t);
481       xbt_dynar_insert_at(recvs, i, t);
482       xbt_free(t);
483     }
484    }
485    int rank_traced = smpi_process_index();
486    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
487    extra->type = TRACING_WAITALL;
488    extra->send_size=count_requests;
489    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
490
491    smpi_mpi_waitall(count_requests, requests, status);
492
493    for (i = 0; i < count_requests; i++) {
494     int src_traced, dst_traced, is_wait_for_receive;
495     xbt_dynar_get_cpy(srcs, i, &src_traced);
496     xbt_dynar_get_cpy(dsts, i, &dst_traced);
497     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
498     if (is_wait_for_receive) {
499       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
500     }
501    }
502    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
503    //clean-up of dynars
504    xbt_dynar_free(&srcs);
505    xbt_dynar_free(&dsts);
506    xbt_dynar_free(&recvs);
507
508    //TODO xbt_dynar_free_container(get_reqq_self());
509    set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
510   }
511   log_timed_action (action, clock);
512 }
513
514 static void action_barrier(const char *const *action){
515   double clock = smpi_process_simulated_elapsed();
516   int rank = smpi_process_index();
517   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
518   extra->type = TRACING_BARRIER;
519   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
520
521   mpi_coll_barrier_fun(MPI_COMM_WORLD);
522
523   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
524   log_timed_action (action, clock);
525 }
526
527
528 static void action_bcast(const char *const *action)
529 {
530   CHECK_ACTION_PARAMS(action, 1, 2);
531   double size = parse_double(action[2]);
532   double clock = smpi_process_simulated_elapsed();
533   int root=0;
534   /*
535    * Initialize MPI_CURRENT_TYPE in order to decrease
536    * the number of the checks
537    * */
538   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
539
540   if(action[3]) {
541     root= atoi(action[3]);
542     if(action[4]) {
543       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
544     }
545   }
546
547   int rank = smpi_process_index();
548   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
549
550   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
551   extra->type = TRACING_BCAST;
552   extra->send_size = size;
553   extra->root = root_traced;
554   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
555   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
556   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
557
558   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
559
560   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
561   log_timed_action (action, clock);
562 }
563
564 static void action_reduce(const char *const *action)
565 {
566   CHECK_ACTION_PARAMS(action, 2, 2);
567   double comm_size = parse_double(action[2]);
568   double comp_size = parse_double(action[3]);
569   double clock = smpi_process_simulated_elapsed();
570   int root=0;
571   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
572
573   if(action[4]) {
574     root= atoi(action[4]);
575     if(action[5]) {
576       MPI_CURRENT_TYPE=decode_datatype(action[5]);
577     }
578   }
579   
580   
581
582   int rank = smpi_process_index();
583   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
584   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585   extra->type = TRACING_REDUCE;
586   extra->send_size = comm_size;
587   extra->comp_size = comp_size;
588   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
589   extra->root = root_traced;
590
591   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
592
593   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
594   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
595    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
596    smpi_execute_flops(comp_size);
597
598   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
599   log_timed_action (action, clock);
600 }
601
602 static void action_allReduce(const char *const *action) {
603   CHECK_ACTION_PARAMS(action, 2, 1);
604   double comm_size = parse_double(action[2]);
605   double comp_size = parse_double(action[3]);
606
607   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
608   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
609
610   double clock = smpi_process_simulated_elapsed();
611   int rank = smpi_process_index();
612   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
613   extra->type = TRACING_ALLREDUCE;
614   extra->send_size = comm_size;
615   extra->comp_size = comp_size;
616   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
617   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
618
619   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
620   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
621   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
622   smpi_execute_flops(comp_size);
623
624   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
625   log_timed_action (action, clock);
626 }
627
628 static void action_allToAll(const char *const *action) {
629   double clock = smpi_process_simulated_elapsed();
630   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
631   int send_size = parse_double(action[2]);
632   int recv_size = parse_double(action[3]);
633   MPI_Datatype MPI_CURRENT_TYPE2;
634
635   if(action[4]) {
636     MPI_CURRENT_TYPE=decode_datatype(action[4]);
637     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
638   }
639   else {
640     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
641     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
642   }
643   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
644   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
645
646   int rank = smpi_process_index();
647   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
648   extra->type = TRACING_ALLTOALL;
649   extra->send_size = send_size;
650   extra->recv_size = recv_size;
651   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
652   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
653
654   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
655
656   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
657
658   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
659   log_timed_action (action, clock);
660 }
661
662
663 static void action_gather(const char *const *action) {
664   /*
665  The structure of the gather action for the rank 0 (total 4 processes) 
666  is the following:   
667  0 gather 68 68 0 0 0
668
669   where: 
670   1) 68 is the sendcounts
671   2) 68 is the recvcounts
672   3) 0 is the root node
673   4) 0 is the send datatype id, see decode_datatype()
674   5) 0 is the recv datatype id, see decode_datatype()
675   */
676   CHECK_ACTION_PARAMS(action, 2, 3);
677   double clock = smpi_process_simulated_elapsed();
678   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
679   int send_size = parse_double(action[2]);
680   int recv_size = parse_double(action[3]);
681   MPI_Datatype MPI_CURRENT_TYPE2;
682   if(action[5]) {
683     MPI_CURRENT_TYPE=decode_datatype(action[5]);
684     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
685   } else {
686     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
687     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
688   }
689   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
690   void *recv = NULL;
691   int root=0;
692   if(action[4])
693     root=atoi(action[4]);
694   int rank = smpi_comm_rank(MPI_COMM_WORLD);
695
696   if(rank==root)
697     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
698
699   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
700   extra->type = TRACING_GATHER;
701   extra->send_size = send_size;
702   extra->recv_size = recv_size;
703   extra->root = root;
704   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
705   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
706
707   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
708
709   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
710                 recv, recv_size, MPI_CURRENT_TYPE2,
711                 root, MPI_COMM_WORLD);
712
713   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
714   log_timed_action (action, clock);
715 }
716
717
718
719 static void action_gatherv(const char *const *action) {
720   /*
721  The structure of the gatherv action for the rank 0 (total 4 processes)
722  is the following:
723  0 gather 68 68 10 10 10 0 0 0
724
725   where:
726   1) 68 is the sendcount
727   2) 68 10 10 10 is the recvcounts
728   3) 0 is the root node
729   4) 0 is the send datatype id, see decode_datatype()
730   5) 0 is the recv datatype id, see decode_datatype()
731   */
732
733   double clock = smpi_process_simulated_elapsed();
734   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
735   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
736   int send_size = parse_double(action[2]);
737   int *disps = xbt_new0(int, comm_size);
738   int *recvcounts = xbt_new0(int, comm_size);
739   int i=0,recv_sum=0;
740
741   MPI_Datatype MPI_CURRENT_TYPE2;
742   if(action[4+comm_size]) {
743     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
744     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
745   } else {
746     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
747     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
748   }
749   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
750   void *recv = NULL;
751   for(i=0;i<comm_size;i++) {
752     recvcounts[i] = atoi(action[i+3]);
753     recv_sum=recv_sum+recvcounts[i];
754     disps[i] = 0;
755   }
756
757   int root=atoi(action[3+comm_size]);
758   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
759
760   if(rank==root)
761     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
762
763   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
764   extra->type = TRACING_GATHERV;
765   extra->send_size = send_size;
766   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
767   for(i=0; i< comm_size; i++)//copy data to avoid bad free
768     extra->recvcounts[i] = recvcounts[i];
769   extra->root = root;
770   extra->num_processes = comm_size;
771   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
772   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
773
774   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
775
776   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
777                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
778                 root, MPI_COMM_WORLD);
779
780   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
781   log_timed_action (action, clock);
782   xbt_free(recvcounts);
783   xbt_free(disps);
784 }
785
786 static void action_reducescatter(const char *const *action) {
787
788     /*
789  The structure of the reducescatter action for the rank 0 (total 4 processes) 
790  is the following:   
791 0 reduceScatter 275427 275427 275427 204020 11346849 0
792
793   where: 
794   1) The first four values after the name of the action declare the recvcounts array
795   2) The value 11346849 is the amount of instructions
796   3) The last value corresponds to the datatype, see decode_datatype().
797
798   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
799
800    */
801
802   double clock = smpi_process_simulated_elapsed();
803   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
804   CHECK_ACTION_PARAMS(action, comm_size+1, 1);
805   int comp_size = parse_double(action[2+comm_size]);
806   int *recvcounts = xbt_new0(int, comm_size);  
807   int *disps = xbt_new0(int, comm_size);  
808   int i=0;
809   int rank = smpi_process_index();
810   int size = 0;
811   if(action[3+comm_size])
812     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
813   else
814     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
815
816   for(i=0;i<comm_size;i++) {
817     recvcounts[i] = atoi(action[i+2]);
818     disps[i] = 0;
819     size+=recvcounts[i];
820   }
821
822   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823   extra->type = TRACING_REDUCE_SCATTER;
824   extra->send_size = 0;
825   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
826   for(i=0; i< comm_size; i++)//copy data to avoid bad free
827     extra->recvcounts[i] = recvcounts[i];
828   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
829   extra->comp_size = comp_size;
830   extra->num_processes = comm_size;
831
832   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
833
834   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
835   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836    
837    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
838        MPI_COMM_WORLD);
839    smpi_execute_flops(comp_size);
840
841
842   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
843   xbt_free(recvcounts);
844   xbt_free(disps);
845   log_timed_action (action, clock);
846 }
847
848 static void action_allgather(const char *const *action) {
849   /*
850  The structure of the allgather action for the rank 0 (total 4 processes) 
851  is the following:   
852   0 allGather 275427 275427
853
854   where: 
855   1) 275427 is the sendcount
856   2) 275427 is the recvcount
857   3) No more values mean that the datatype for sent and receive buffer
858   is the default one, see decode_datatype().
859
860    */
861
862   double clock = smpi_process_simulated_elapsed();
863
864   CHECK_ACTION_PARAMS(action, 2, 2);
865   int sendcount=atoi(action[2]); 
866   int recvcount=atoi(action[3]); 
867
868   MPI_Datatype MPI_CURRENT_TYPE2;
869
870   if(action[4]) {
871     MPI_CURRENT_TYPE = decode_datatype(action[3]);
872     MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
873   } else {
874     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
875     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
876   }
877   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
878   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
879
880   int rank = smpi_process_index();
881   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882   extra->type = TRACING_ALLGATHER;
883   extra->send_size = sendcount;
884   extra->recv_size= recvcount;
885   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
886   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
887   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
888
889   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
890
891   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
892
893   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
894   log_timed_action (action, clock);
895 }
896
897 static void action_allgatherv(const char *const *action) {
898
899   /*
900  The structure of the allgatherv action for the rank 0 (total 4 processes) 
901  is the following:   
902 0 allGatherV 275427 275427 275427 275427 204020
903
904   where: 
905   1) 275427 is the sendcount
906   2) The next four elements declare the recvcounts array
907   3) No more values mean that the datatype for sent and receive buffer
908   is the default one, see decode_datatype().
909
910    */
911
912   double clock = smpi_process_simulated_elapsed();
913
914   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
915   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
916   int i=0;
917   int sendcount=atoi(action[2]);
918   int *recvcounts = xbt_new0(int, comm_size);  
919   int *disps = xbt_new0(int, comm_size);  
920   int recv_sum=0;  
921   MPI_Datatype MPI_CURRENT_TYPE2;
922
923   if(action[3+comm_size]) {
924     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
925     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
926   } else {
927     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
928     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
929   }
930   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
931
932   for(i=0;i<comm_size;i++) {
933     recvcounts[i] = atoi(action[i+3]);
934     recv_sum=recv_sum+recvcounts[i];
935   }
936   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
937
938   int rank = smpi_process_index();
939   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
940   extra->type = TRACING_ALLGATHERV;
941   extra->send_size = sendcount;
942   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
943   for(i=0; i< comm_size; i++)//copy data to avoid bad free
944     extra->recvcounts[i] = recvcounts[i];
945   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
946   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
947   extra->num_processes = comm_size;
948
949   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
950
951   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
952
953   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
954   log_timed_action (action, clock);
955   xbt_free(recvcounts);
956   xbt_free(disps);
957 }
958
959 static void action_allToAllv(const char *const *action) {
960   /*
961  The structure of the allToAllV action for the rank 0 (total 4 processes) 
962  is the following:   
963   0 allToAllV 100 1 7 10 12 100 1 70 10 5
964
965   where: 
966   1) 100 is the size of the send buffer *sizeof(int),
967   2) 1 7 10 12 is the sendcounts array
968   3) 100*sizeof(int) is the size of the receiver buffer
969   4)  1 70 10 5 is the recvcounts array
970
971    */
972
973
974   double clock = smpi_process_simulated_elapsed();
975
976   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
977   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
978   int send_buf_size=0,recv_buf_size=0,i=0;
979   int *sendcounts = xbt_new0(int, comm_size);  
980   int *recvcounts = xbt_new0(int, comm_size);  
981   int *senddisps = xbt_new0(int, comm_size);  
982   int *recvdisps = xbt_new0(int, comm_size);  
983
984   MPI_Datatype MPI_CURRENT_TYPE2;
985
986   send_buf_size=parse_double(action[2]);
987   recv_buf_size=parse_double(action[3+comm_size]);
988   if(action[4+2*comm_size]) {
989     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
990     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
991   }
992   else {
993       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
994       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
995   }
996
997   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
998   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
999
1000   for(i=0;i<comm_size;i++) {
1001     sendcounts[i] = atoi(action[i+3]);
1002     recvcounts[i] = atoi(action[i+4+comm_size]);
1003   }
1004
1005
1006   int rank = smpi_process_index();
1007   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1008   extra->type = TRACING_ALLTOALLV;
1009   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1010   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1011   extra->num_processes = comm_size;
1012
1013   for(i=0; i< comm_size; i++){//copy data to avoid bad free
1014     extra->send_size += sendcounts[i];
1015     extra->sendcounts[i] = sendcounts[i];
1016     extra->recv_size += recvcounts[i];
1017     extra->recvcounts[i] = recvcounts[i];
1018   }
1019   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1020   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1021
1022   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1023
1024   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1025                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1026                                MPI_COMM_WORLD);
1027
1028   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1029   log_timed_action (action, clock);
1030   xbt_free(sendcounts);
1031   xbt_free(recvcounts);
1032   xbt_free(senddisps);
1033   xbt_free(recvdisps);
1034 }
1035
1036 void smpi_replay_init(int *argc, char***argv){
1037   smpi_process_init(argc, argv);
1038   smpi_process_mark_as_initialized();
1039   smpi_process_set_replaying(1);
1040
1041   int rank = smpi_process_index();
1042   TRACE_smpi_init(rank);
1043   TRACE_smpi_computing_init(rank);
1044   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1045   extra->type = TRACING_INIT;
1046   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1047   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1048
1049   if (!_xbt_replay_action_init()) {
1050     xbt_replay_action_register("init",       action_init);
1051     xbt_replay_action_register("finalize",   action_finalize);
1052     xbt_replay_action_register("comm_size",  action_comm_size);
1053     xbt_replay_action_register("comm_split", action_comm_split);
1054     xbt_replay_action_register("comm_dup",   action_comm_dup);
1055     xbt_replay_action_register("send",       action_send);
1056     xbt_replay_action_register("Isend",      action_Isend);
1057     xbt_replay_action_register("recv",       action_recv);
1058     xbt_replay_action_register("Irecv",      action_Irecv);
1059     xbt_replay_action_register("test",       action_test);
1060     xbt_replay_action_register("wait",       action_wait);
1061     xbt_replay_action_register("waitAll",    action_waitall);
1062     xbt_replay_action_register("barrier",    action_barrier);
1063     xbt_replay_action_register("bcast",      action_bcast);
1064     xbt_replay_action_register("reduce",     action_reduce);
1065     xbt_replay_action_register("allReduce",  action_allReduce);
1066     xbt_replay_action_register("allToAll",   action_allToAll);
1067     xbt_replay_action_register("allToAllV",  action_allToAllv);
1068     xbt_replay_action_register("gather",  action_gather);
1069     xbt_replay_action_register("gatherV",  action_gatherv);
1070     xbt_replay_action_register("allGather",  action_allgather);
1071     xbt_replay_action_register("allGatherV",  action_allgatherv);
1072     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1073     xbt_replay_action_register("compute",    action_compute);
1074   }
1075   
1076   //if we have a delayed start, sleep here.
1077   if(*argc>2){
1078     char *endptr;
1079     double value = strtod((*argv)[2], &endptr);
1080     if (*endptr != '\0')
1081       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1082     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1083     smpi_execute_flops(value);
1084   } else {
1085     //UGLY done to force context switch to be sure that all MSG_processes begin initialization
1086     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
1087     smpi_execute_flops(0.0);
1088   }
1089   
1090   xbt_replay_action_runner(*argc, *argv);
1091 }
1092
1093 int smpi_replay_finalize(){
1094   double sim_time= 1.;
1095   /* One active process will stop. Decrease the counter*/
1096   XBT_DEBUG("There are %lu elements in reqq[*]",
1097             xbt_dynar_length(get_reqq_self()));
1098   if (!xbt_dynar_is_empty(get_reqq_self())){
1099     int count_requests=xbt_dynar_length(get_reqq_self());
1100     MPI_Request requests[count_requests];
1101     MPI_Status status[count_requests];
1102     unsigned int i;
1103
1104     xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1105     smpi_mpi_waitall(count_requests, requests, status);
1106     active_processes--;
1107   } else {
1108     active_processes--;
1109   }
1110
1111   if(!active_processes){
1112     /* Last process alive speaking */
1113     /* end the simulated timer */
1114     sim_time = smpi_process_simulated_elapsed();
1115   }
1116   
1117
1118   //TODO xbt_dynar_free_container(get_reqq_self()));
1119
1120   if(!active_processes){
1121     XBT_INFO("Simulation time %f", sim_time);
1122     _xbt_replay_action_exit();
1123     xbt_free(sendbuffer);
1124     xbt_free(recvbuffer);
1125     //xbt_free(reqq);
1126     xbt_dict_free(&reqq); //not need, data have been freed ???
1127     reqq = NULL;
1128   }
1129   
1130
1131   int rank = smpi_process_index();
1132   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1133   extra->type = TRACING_FINALIZE;
1134   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1135
1136   smpi_process_finalize();
1137
1138   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1139   TRACE_smpi_finalize(smpi_process_index());
1140   smpi_process_destroy();
1141   return MPI_SUCCESS;
1142 }