Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bdcd94895cd8e26715a368250dbbe1e43a8d383e
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     free(name);
33   }
34 }
35
36
37 static xbt_dynar_t get_reqq_self(){
38    char * key;
39    
40    asprintf(&key, "%d", smpi_process_index());
41    xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
42    free(key);
43  
44    return dynar_mpi_request;
45 }
46
47 static void set_reqq_self(xbt_dynar_t mpi_request){
48    char * key;
49    
50    asprintf(&key, "%d", smpi_process_index());
51    xbt_dict_set(reqq, key, mpi_request, free);
52    free(key);
53 }
54
55
56 //allocate a single buffer for all sends, growing it if needed
57 void* smpi_get_tmp_sendbuffer(int size){
58   if (!smpi_process_get_replaying())
59         return xbt_malloc(size);
60   if (sendbuffer_size<size){
61     sendbuffer=xbt_realloc(sendbuffer,size);
62     sendbuffer_size=size;
63   }
64   return sendbuffer;
65 }
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (!smpi_process_get_replaying())
69         return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=xbt_realloc(recvbuffer,size);
72     recvbuffer_size=size;
73   }
74   return sendbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (!smpi_process_get_replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   double value;
86   char *endptr;
87   value = strtod(string, &endptr);
88   if (*endptr != '\0')
89     THROWF(unknown_error, 0, "%s is not a double", string);
90   return value;
91 }
92
93 static MPI_Datatype decode_datatype(const char *const action)
94 {
95 // Declared datatypes,
96
97   switch(atoi(action))
98   {
99     case 0:
100       MPI_CURRENT_TYPE=MPI_DOUBLE;
101       break;
102     case 1:
103       MPI_CURRENT_TYPE=MPI_INT;
104       break;
105     case 2:
106       MPI_CURRENT_TYPE=MPI_CHAR;
107       break;
108     case 3:
109       MPI_CURRENT_TYPE=MPI_SHORT;
110       break;
111     case 4:
112       MPI_CURRENT_TYPE=MPI_LONG;
113       break;
114     case 5:
115       MPI_CURRENT_TYPE=MPI_FLOAT;
116       break;
117     case 6:
118       MPI_CURRENT_TYPE=MPI_BYTE;
119       break;
120     default:
121       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122
123   }
124    return MPI_CURRENT_TYPE;
125 }
126
127
128 const char* encode_datatype(MPI_Datatype datatype, int* known)
129 {
130
131   //default type for output is set to MPI_BYTE
132   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
133   if(known)*known=1;
134   if (datatype==MPI_BYTE){
135       return "";
136   }
137   if(datatype==MPI_DOUBLE)
138       return "0";
139   if(datatype==MPI_INT)
140       return "1";
141   if(datatype==MPI_CHAR)
142       return "2";
143   if(datatype==MPI_SHORT)
144       return "3";
145   if(datatype==MPI_LONG)
146     return "4";
147   if(datatype==MPI_FLOAT)
148       return "5";
149   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
150   if(known)*known=0;
151   // default - not implemented.
152   // do not warn here as we pass in this function even for other trace formats
153   return "-1";
154 }
155
156 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
157     int i=0;\
158     while(action[i]!=NULL)\
159      i++;\
160     if(i<mandatory+2)                                           \
161     THROWF(arg_error, 0, "%s replay failed.\n" \
162           "%d items were given on the line. First two should be process_id and action.  " \
163           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
164           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
165   }
166
167
168 static void action_init(const char *const *action)
169 {
170   int i;
171   XBT_DEBUG("Initialize the counters");
172   CHECK_ACTION_PARAMS(action, 0, 1);
173   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
174   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
175
176   /* start a simulated timer */
177   smpi_process_simulated_start();
178   /*initialize the number of active processes */
179   active_processes = smpi_process_count();
180
181   if (!reqq) {
182     reqq = xbt_dict_new();
183   }
184
185   set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
186
187   /*
188     reqq=xbt_new0(xbt_dynar_t,active_processes);
189
190     for(i=0;i<active_processes;i++){
191       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
192     }
193   }
194   */
195 }
196
197 static void action_finalize(const char *const *action)
198 {
199 }
200
201 static void action_comm_size(const char *const *action)
202 {
203   double clock = smpi_process_simulated_elapsed();
204
205   communicator_size = parse_double(action[2]);
206   log_timed_action (action, clock);
207 }
208
209 static void action_comm_split(const char *const *action)
210 {
211   double clock = smpi_process_simulated_elapsed();
212
213   log_timed_action (action, clock);
214 }
215
216 static void action_comm_dup(const char *const *action)
217 {
218   double clock = smpi_process_simulated_elapsed();
219
220   log_timed_action (action, clock);
221 }
222
223 static void action_compute(const char *const *action)
224 {
225   CHECK_ACTION_PARAMS(action, 1, 0);
226   double clock = smpi_process_simulated_elapsed();
227   double flops= parse_double(action[2]);
228   int rank = smpi_process_index();
229   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
230   extra->type=TRACING_COMPUTING;
231   extra->comp_size=flops;
232   TRACE_smpi_computing_in(rank, extra);
233
234   smpi_execute_flops(flops);
235
236   TRACE_smpi_computing_out(rank);
237   log_timed_action (action, clock);
238 }
239
240 static void action_send(const char *const *action)
241 {
242   CHECK_ACTION_PARAMS(action, 2, 1);
243   int to = atoi(action[2]);
244   double size=parse_double(action[3]);
245   double clock = smpi_process_simulated_elapsed();
246
247   if(action[4]) {
248     MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   } else {
250     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251   }
252
253   int rank = smpi_process_index();
254
255   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
256   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257   extra->type = TRACING_SEND;
258   extra->send_size = size;
259   extra->src = rank;
260   extra->dst = dst_traced;
261   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
262   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
264
265   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
266
267   log_timed_action (action, clock);
268
269   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
270 }
271
272 static void action_Isend(const char *const *action)
273 {
274   CHECK_ACTION_PARAMS(action, 2, 1);
275   int to = atoi(action[2]);
276   double size=parse_double(action[3]);
277   double clock = smpi_process_simulated_elapsed();
278   MPI_Request request;
279
280   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
281   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
282
283   int rank = smpi_process_index();
284   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
285   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
286   extra->type = TRACING_ISEND;
287   extra->send_size = size;
288   extra->src = rank;
289   extra->dst = dst_traced;
290   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
291   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
292   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
293
294   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
295
296   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
297   request->send = 1;
298
299   xbt_dynar_push(get_reqq_self(),&request);
300
301   log_timed_action (action, clock);
302 }
303
304 static void action_recv(const char *const *action) {
305   CHECK_ACTION_PARAMS(action, 2, 1);
306   int from = atoi(action[2]);
307   double size=parse_double(action[3]);
308   double clock = smpi_process_simulated_elapsed();
309   MPI_Status status;
310
311   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
312   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
313
314   int rank = smpi_process_index();
315   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
316
317   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
318   extra->type = TRACING_RECV;
319   extra->send_size = size;
320   extra->src = src_traced;
321   extra->dst = rank;
322   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
323   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
324
325   //unknow size from the receiver pov
326   if(size==-1){
327       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
328       size=status.count;
329   }
330
331   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
332
333   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
334   TRACE_smpi_recv(rank, src_traced, rank);
335
336   log_timed_action (action, clock);
337 }
338
339 static void action_Irecv(const char *const *action)
340 {
341   CHECK_ACTION_PARAMS(action, 2, 1);
342   int from = atoi(action[2]);
343   double size=parse_double(action[3]);
344   double clock = smpi_process_simulated_elapsed();
345   MPI_Request request;
346
347   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
348   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
349
350   int rank = smpi_process_index();
351   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
352   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
353   extra->type = TRACING_IRECV;
354   extra->send_size = size;
355   extra->src = src_traced;
356   extra->dst = rank;
357   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
358   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
359   MPI_Status status;
360   //unknow size from the receiver pov
361   if(size==-1){
362       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
363       size=status.count;
364   }
365
366   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
367
368   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
369   request->recv = 1;
370   xbt_dynar_push(get_reqq_self(),&request);
371
372   log_timed_action (action, clock);
373 }
374
375 static void action_test(const char *const *action){
376   CHECK_ACTION_PARAMS(action, 0, 0);
377   double clock = smpi_process_simulated_elapsed();
378   MPI_Request request;
379   MPI_Status status;
380   int flag = TRUE;
381
382   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
383   //if request is null here, this may mean that a previous test has succeeded 
384   //Different times in traced application and replayed version may lead to this 
385   //In this case, ignore the extra calls.
386   if(request){
387           int rank = smpi_process_index();
388           instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
389           extra->type=TRACING_TEST;
390           TRACE_smpi_testing_in(rank, extra);
391
392           flag = smpi_mpi_test(&request, &status);
393
394           XBT_DEBUG("MPI_Test result: %d", flag);
395           /* push back request in dynar to be caught by a subsequent wait. if the test
396            * did succeed, the request is now NULL.
397            */
398           xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
399
400           TRACE_smpi_testing_out(rank);
401   }
402   log_timed_action (action, clock);
403 }
404
405 static void action_wait(const char *const *action){
406   CHECK_ACTION_PARAMS(action, 0, 0);
407   double clock = smpi_process_simulated_elapsed();
408   MPI_Request request;
409   MPI_Status status;
410
411   xbt_assert(xbt_dynar_length(get_reqq_self()),
412       "action wait not preceded by any irecv or isend: %s",
413       xbt_str_join_array(action," "));
414   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
415
416   if (!request){
417     /* Assuming that the trace is well formed, this mean the comm might have
418      * been caught by a MPI_test. Then just return.
419      */
420     return;
421   }
422
423   int rank = request->comm != MPI_COMM_NULL
424       ? smpi_comm_rank(request->comm)
425       : -1;
426
427   MPI_Group group = smpi_comm_group(request->comm);
428   int src_traced = smpi_group_rank(group, request->src);
429   int dst_traced = smpi_group_rank(group, request->dst);
430   int is_wait_for_receive = request->recv;
431   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
432   extra->type = TRACING_WAIT;
433   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
434
435   smpi_mpi_wait(&request, &status);
436
437   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
438   if (is_wait_for_receive)
439     TRACE_smpi_recv(rank, src_traced, dst_traced);
440   log_timed_action (action, clock);
441 }
442
443 static void action_waitall(const char *const *action){
444   CHECK_ACTION_PARAMS(action, 0, 0);
445   double clock = smpi_process_simulated_elapsed();
446   int count_requests=0;
447   unsigned int i=0;
448
449   count_requests=xbt_dynar_length(get_reqq_self());
450
451   if (count_requests>0) {
452     MPI_Request requests[count_requests];
453     MPI_Status status[count_requests];
454
455     /*  The reqq is an array of dynars. Its index corresponds to the rank.
456      Thus each rank saves its own requests to the array request. */
457     xbt_dynar_foreach(get_reqq_self(),i,requests[i]); 
458
459    //save information from requests
460
461    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
462    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
463    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
464    for (i = 0; i < count_requests; i++) {
465     if(requests[i]){
466       int *asrc = xbt_new(int, 1);
467       int *adst = xbt_new(int, 1);
468       int *arecv = xbt_new(int, 1);
469       *asrc = requests[i]->src;
470       *adst = requests[i]->dst;
471       *arecv = requests[i]->recv;
472       xbt_dynar_insert_at(srcs, i, asrc);
473       xbt_dynar_insert_at(dsts, i, adst);
474       xbt_dynar_insert_at(recvs, i, arecv);
475       xbt_free(asrc);
476       xbt_free(adst);
477       xbt_free(arecv);
478     }else {
479       int *t = xbt_new(int, 1);
480       xbt_dynar_insert_at(srcs, i, t);
481       xbt_dynar_insert_at(dsts, i, t);
482       xbt_dynar_insert_at(recvs, i, t);
483       xbt_free(t);
484     }
485    }
486    int rank_traced = smpi_process_index();
487    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
488    extra->type = TRACING_WAITALL;
489    extra->send_size=count_requests;
490    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
491
492    smpi_mpi_waitall(count_requests, requests, status);
493
494    for (i = 0; i < count_requests; i++) {
495     int src_traced, dst_traced, is_wait_for_receive;
496     xbt_dynar_get_cpy(srcs, i, &src_traced);
497     xbt_dynar_get_cpy(dsts, i, &dst_traced);
498     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
499     if (is_wait_for_receive) {
500       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
501     }
502    }
503    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
504    //clean-up of dynars
505    xbt_dynar_free(&srcs);
506    xbt_dynar_free(&dsts);
507    xbt_dynar_free(&recvs);
508
509    //TODO xbt_dynar_free_container(get_reqq_self());
510    set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
511   }
512   log_timed_action (action, clock);
513 }
514
515 static void action_barrier(const char *const *action){
516   double clock = smpi_process_simulated_elapsed();
517   int rank = smpi_process_index();
518   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
519   extra->type = TRACING_BARRIER;
520   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
521
522   mpi_coll_barrier_fun(MPI_COMM_WORLD);
523
524   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
525   log_timed_action (action, clock);
526 }
527
528
529 static void action_bcast(const char *const *action)
530 {
531   CHECK_ACTION_PARAMS(action, 1, 2);
532   double size = parse_double(action[2]);
533   double clock = smpi_process_simulated_elapsed();
534   int root=0;
535   /*
536    * Initialize MPI_CURRENT_TYPE in order to decrease
537    * the number of the checks
538    * */
539   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
540
541   if(action[3]) {
542     root= atoi(action[3]);
543     if(action[4]) {
544       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
545     }
546   }
547
548   int rank = smpi_process_index();
549   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
550
551   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
552   extra->type = TRACING_BCAST;
553   extra->send_size = size;
554   extra->root = root_traced;
555   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
556   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
557   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
558
559   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
560
561   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
562   log_timed_action (action, clock);
563 }
564
565 static void action_reduce(const char *const *action)
566 {
567   CHECK_ACTION_PARAMS(action, 2, 2);
568   double comm_size = parse_double(action[2]);
569   double comp_size = parse_double(action[3]);
570   double clock = smpi_process_simulated_elapsed();
571   int root=0;
572   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
573
574   if(action[4]) {
575     root= atoi(action[4]);
576     if(action[5]) {
577       MPI_CURRENT_TYPE=decode_datatype(action[5]);
578     }
579   }
580   
581   
582
583   int rank = smpi_process_index();
584   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
585   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
586   extra->type = TRACING_REDUCE;
587   extra->send_size = comm_size;
588   extra->comp_size = comp_size;
589   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
590   extra->root = root_traced;
591
592   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
593
594   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
595   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
596    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
597    smpi_execute_flops(comp_size);
598
599   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
600   log_timed_action (action, clock);
601 }
602
603 static void action_allReduce(const char *const *action) {
604   CHECK_ACTION_PARAMS(action, 2, 1);
605   double comm_size = parse_double(action[2]);
606   double comp_size = parse_double(action[3]);
607
608   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
609   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
610
611   double clock = smpi_process_simulated_elapsed();
612   int rank = smpi_process_index();
613   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
614   extra->type = TRACING_ALLREDUCE;
615   extra->send_size = comm_size;
616   extra->comp_size = comp_size;
617   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
618   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
619
620   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
621   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
622   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
623   smpi_execute_flops(comp_size);
624
625   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
626   log_timed_action (action, clock);
627 }
628
629 static void action_allToAll(const char *const *action) {
630   double clock = smpi_process_simulated_elapsed();
631   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
632   int send_size = parse_double(action[2]);
633   int recv_size = parse_double(action[3]);
634   MPI_Datatype MPI_CURRENT_TYPE2;
635
636   if(action[4]) {
637     MPI_CURRENT_TYPE=decode_datatype(action[4]);
638     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
639   }
640   else {
641     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
642     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
643   }
644   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
645   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
646
647   int rank = smpi_process_index();
648   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
649   extra->type = TRACING_ALLTOALL;
650   extra->send_size = send_size;
651   extra->recv_size = recv_size;
652   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
653   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
654
655   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
656
657   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
658
659   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
660   log_timed_action (action, clock);
661 }
662
663
664 static void action_gather(const char *const *action) {
665   /*
666  The structure of the gather action for the rank 0 (total 4 processes) 
667  is the following:   
668  0 gather 68 68 0 0 0
669
670   where: 
671   1) 68 is the sendcounts
672   2) 68 is the recvcounts
673   3) 0 is the root node
674   4) 0 is the send datatype id, see decode_datatype()
675   5) 0 is the recv datatype id, see decode_datatype()
676   */
677   CHECK_ACTION_PARAMS(action, 2, 3);
678   double clock = smpi_process_simulated_elapsed();
679   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
680   int send_size = parse_double(action[2]);
681   int recv_size = parse_double(action[3]);
682   MPI_Datatype MPI_CURRENT_TYPE2;
683   if(action[5]) {
684     MPI_CURRENT_TYPE=decode_datatype(action[5]);
685     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
686   } else {
687     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
688     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
689   }
690   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
691   void *recv = NULL;
692   int root=0;
693   if(action[4])
694     root=atoi(action[4]);
695   int rank = smpi_comm_rank(MPI_COMM_WORLD);
696
697   if(rank==root)
698     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
699
700   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
701   extra->type = TRACING_GATHER;
702   extra->send_size = send_size;
703   extra->recv_size = recv_size;
704   extra->root = root;
705   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
706   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
707
708   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
709
710   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
711                 recv, recv_size, MPI_CURRENT_TYPE2,
712                 root, MPI_COMM_WORLD);
713
714   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
715   log_timed_action (action, clock);
716 }
717
718
719
720 static void action_gatherv(const char *const *action) {
721   /*
722  The structure of the gatherv action for the rank 0 (total 4 processes)
723  is the following:
724  0 gather 68 68 10 10 10 0 0 0
725
726   where:
727   1) 68 is the sendcount
728   2) 68 10 10 10 is the recvcounts
729   3) 0 is the root node
730   4) 0 is the send datatype id, see decode_datatype()
731   5) 0 is the recv datatype id, see decode_datatype()
732   */
733
734   double clock = smpi_process_simulated_elapsed();
735   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
736   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
737   int send_size = parse_double(action[2]);
738   int *disps = xbt_new0(int, comm_size);
739   int *recvcounts = xbt_new0(int, comm_size);
740   int i=0,recv_sum=0;
741
742   MPI_Datatype MPI_CURRENT_TYPE2;
743   if(action[4+comm_size]) {
744     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
745     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
746   } else {
747     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
748     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
749   }
750   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
751   void *recv = NULL;
752   for(i=0;i<comm_size;i++) {
753     recvcounts[i] = atoi(action[i+3]);
754     recv_sum=recv_sum+recvcounts[i];
755     disps[i] = 0;
756   }
757
758   int root=atoi(action[3+comm_size]);
759   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
760
761   if(rank==root)
762     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
763
764   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
765   extra->type = TRACING_GATHERV;
766   extra->send_size = send_size;
767   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
768   for(i=0; i< comm_size; i++)//copy data to avoid bad free
769     extra->recvcounts[i] = recvcounts[i];
770   extra->root = root;
771   extra->num_processes = comm_size;
772   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
773   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
774
775   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
776
777   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
778                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
779                 root, MPI_COMM_WORLD);
780
781   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
782   log_timed_action (action, clock);
783   xbt_free(recvcounts);
784   xbt_free(disps);
785 }
786
787 static void action_reducescatter(const char *const *action) {
788
789     /*
790  The structure of the reducescatter action for the rank 0 (total 4 processes) 
791  is the following:   
792 0 reduceScatter 275427 275427 275427 204020 11346849 0
793
794   where: 
795   1) The first four values after the name of the action declare the recvcounts array
796   2) The value 11346849 is the amount of instructions
797   3) The last value corresponds to the datatype, see decode_datatype().
798
799   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
800
801    */
802
803   double clock = smpi_process_simulated_elapsed();
804   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
805   CHECK_ACTION_PARAMS(action, comm_size+1, 1);
806   int comp_size = parse_double(action[2+comm_size]);
807   int *recvcounts = xbt_new0(int, comm_size);  
808   int *disps = xbt_new0(int, comm_size);  
809   int i=0;
810   int rank = smpi_process_index();
811   int size = 0;
812   if(action[3+comm_size])
813     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
814   else
815     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
816
817   for(i=0;i<comm_size;i++) {
818     recvcounts[i] = atoi(action[i+2]);
819     disps[i] = 0;
820     size+=recvcounts[i];
821   }
822
823   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824   extra->type = TRACING_REDUCE_SCATTER;
825   extra->send_size = 0;
826   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
827   for(i=0; i< comm_size; i++)//copy data to avoid bad free
828     extra->recvcounts[i] = recvcounts[i];
829   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
830   extra->comp_size = comp_size;
831   extra->num_processes = comm_size;
832
833   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834
835   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
836   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
837    
838    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
839        MPI_COMM_WORLD);
840    smpi_execute_flops(comp_size);
841
842
843   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
844   xbt_free(recvcounts);
845   xbt_free(disps);
846   log_timed_action (action, clock);
847 }
848
849 static void action_allgather(const char *const *action) {
850   /*
851  The structure of the allgather action for the rank 0 (total 4 processes) 
852  is the following:   
853   0 allGather 275427 275427
854
855   where: 
856   1) 275427 is the sendcount
857   2) 275427 is the recvcount
858   3) No more values mean that the datatype for sent and receive buffer
859   is the default one, see decode_datatype().
860
861    */
862
863   double clock = smpi_process_simulated_elapsed();
864
865   CHECK_ACTION_PARAMS(action, 2, 2);
866   int sendcount=atoi(action[2]); 
867   int recvcount=atoi(action[3]); 
868
869   MPI_Datatype MPI_CURRENT_TYPE2;
870
871   if(action[4]) {
872     MPI_CURRENT_TYPE = decode_datatype(action[3]);
873     MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
874   } else {
875     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
876     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
877   }
878   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
879   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
880
881   int rank = smpi_process_index();
882   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
883   extra->type = TRACING_ALLGATHER;
884   extra->send_size = sendcount;
885   extra->recv_size= recvcount;
886   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
887   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
888   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
889
890   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
891
892   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
893
894   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
895   log_timed_action (action, clock);
896 }
897
898 static void action_allgatherv(const char *const *action) {
899
900   /*
901  The structure of the allgatherv action for the rank 0 (total 4 processes) 
902  is the following:   
903 0 allGatherV 275427 275427 275427 275427 204020
904
905   where: 
906   1) 275427 is the sendcount
907   2) The next four elements declare the recvcounts array
908   3) No more values mean that the datatype for sent and receive buffer
909   is the default one, see decode_datatype().
910
911    */
912
913   double clock = smpi_process_simulated_elapsed();
914
915   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
916   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
917   int i=0;
918   int sendcount=atoi(action[2]);
919   int *recvcounts = xbt_new0(int, comm_size);  
920   int *disps = xbt_new0(int, comm_size);  
921   int recv_sum=0;  
922   MPI_Datatype MPI_CURRENT_TYPE2;
923
924   if(action[3+comm_size]) {
925     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
926     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
927   } else {
928     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
929     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
930   }
931   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
932
933   for(i=0;i<comm_size;i++) {
934     recvcounts[i] = atoi(action[i+3]);
935     recv_sum=recv_sum+recvcounts[i];
936   }
937   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
938
939   int rank = smpi_process_index();
940   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
941   extra->type = TRACING_ALLGATHERV;
942   extra->send_size = sendcount;
943   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
944   for(i=0; i< comm_size; i++)//copy data to avoid bad free
945     extra->recvcounts[i] = recvcounts[i];
946   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
947   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
948   extra->num_processes = comm_size;
949
950   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
951
952   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
953
954   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
955   log_timed_action (action, clock);
956   xbt_free(recvcounts);
957   xbt_free(disps);
958 }
959
960 static void action_allToAllv(const char *const *action) {
961   /*
962  The structure of the allToAllV action for the rank 0 (total 4 processes) 
963  is the following:   
964   0 allToAllV 100 1 7 10 12 100 1 70 10 5
965
966   where: 
967   1) 100 is the size of the send buffer *sizeof(int),
968   2) 1 7 10 12 is the sendcounts array
969   3) 100*sizeof(int) is the size of the receiver buffer
970   4)  1 70 10 5 is the recvcounts array
971
972    */
973
974
975   double clock = smpi_process_simulated_elapsed();
976
977   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
978   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
979   int send_buf_size=0,recv_buf_size=0,i=0;
980   int *sendcounts = xbt_new0(int, comm_size);  
981   int *recvcounts = xbt_new0(int, comm_size);  
982   int *senddisps = xbt_new0(int, comm_size);  
983   int *recvdisps = xbt_new0(int, comm_size);  
984
985   MPI_Datatype MPI_CURRENT_TYPE2;
986
987   send_buf_size=parse_double(action[2]);
988   recv_buf_size=parse_double(action[3+comm_size]);
989   if(action[4+2*comm_size]) {
990     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
991     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
992   }
993   else {
994       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
995       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
996   }
997
998   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
999   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1000
1001   for(i=0;i<comm_size;i++) {
1002     sendcounts[i] = atoi(action[i+3]);
1003     recvcounts[i] = atoi(action[i+4+comm_size]);
1004   }
1005
1006
1007   int rank = smpi_process_index();
1008   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1009   extra->type = TRACING_ALLTOALLV;
1010   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1011   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1012   extra->num_processes = comm_size;
1013
1014   for(i=0; i< comm_size; i++){//copy data to avoid bad free
1015     extra->send_size += sendcounts[i];
1016     extra->sendcounts[i] = sendcounts[i];
1017     extra->recv_size += recvcounts[i];
1018     extra->recvcounts[i] = recvcounts[i];
1019   }
1020   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1021   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1022
1023   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1024
1025   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1026                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1027                                MPI_COMM_WORLD);
1028
1029   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1030   log_timed_action (action, clock);
1031   xbt_free(sendcounts);
1032   xbt_free(recvcounts);
1033   xbt_free(senddisps);
1034   xbt_free(recvdisps);
1035 }
1036
1037 void smpi_replay_init(int *argc, char***argv){
1038   smpi_process_init(argc, argv);
1039   smpi_process_mark_as_initialized();
1040   smpi_process_set_replaying(1);
1041
1042   int rank = smpi_process_index();
1043   TRACE_smpi_init(rank);
1044   TRACE_smpi_computing_init(rank);
1045   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1046   extra->type = TRACING_INIT;
1047   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1048   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1049
1050   if (!action_funs){
1051     _xbt_replay_action_init();
1052     xbt_replay_action_register("init",       action_init);
1053     xbt_replay_action_register("finalize",   action_finalize);
1054     xbt_replay_action_register("comm_size",  action_comm_size);
1055     xbt_replay_action_register("comm_split", action_comm_split);
1056     xbt_replay_action_register("comm_dup",   action_comm_dup);
1057     xbt_replay_action_register("send",       action_send);
1058     xbt_replay_action_register("Isend",      action_Isend);
1059     xbt_replay_action_register("recv",       action_recv);
1060     xbt_replay_action_register("Irecv",      action_Irecv);
1061     xbt_replay_action_register("test",       action_test);
1062     xbt_replay_action_register("wait",       action_wait);
1063     xbt_replay_action_register("waitAll",    action_waitall);
1064     xbt_replay_action_register("barrier",    action_barrier);
1065     xbt_replay_action_register("bcast",      action_bcast);
1066     xbt_replay_action_register("reduce",     action_reduce);
1067     xbt_replay_action_register("allReduce",  action_allReduce);
1068     xbt_replay_action_register("allToAll",   action_allToAll);
1069     xbt_replay_action_register("allToAllV",  action_allToAllv);
1070     xbt_replay_action_register("gather",  action_gather);
1071     xbt_replay_action_register("gatherV",  action_gatherv);
1072     xbt_replay_action_register("allGather",  action_allgather);
1073     xbt_replay_action_register("allGatherV",  action_allgatherv);
1074     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1075     xbt_replay_action_register("compute",    action_compute);
1076   }
1077   
1078   //if we have a delayed start, sleep here.
1079   if(*argc>2){
1080     char *endptr;
1081     double value = strtod((*argv)[2], &endptr);
1082     if (*endptr != '\0')
1083       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1084     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1085     smpi_execute_flops(value);
1086   } else {
1087     //UGLY done to force context switch to be sure that all MSG_processes begin initialization
1088     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
1089     smpi_execute_flops(0.0);
1090   }
1091   
1092   xbt_replay_action_runner(*argc, *argv);
1093 }
1094
1095 int smpi_replay_finalize(){
1096   double sim_time= 1.;
1097   /* One active process will stop. Decrease the counter*/
1098   XBT_DEBUG("There are %lu elements in reqq[*]",
1099             xbt_dynar_length(get_reqq_self()));
1100   if (!xbt_dynar_is_empty(get_reqq_self())){
1101     int count_requests=xbt_dynar_length(get_reqq_self());
1102     MPI_Request requests[count_requests];
1103     MPI_Status status[count_requests];
1104     unsigned int i;
1105
1106     xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1107     smpi_mpi_waitall(count_requests, requests, status);
1108     active_processes--;
1109   } else {
1110     active_processes--;
1111   }
1112
1113   if(!active_processes){
1114     /* Last process alive speaking */
1115     /* end the simulated timer */
1116     sim_time = smpi_process_simulated_elapsed();
1117   }
1118   
1119
1120   //TODO xbt_dynar_free_container(get_reqq_self()));
1121
1122   if(!active_processes){
1123     XBT_INFO("Simulation time %f", sim_time);
1124     _xbt_replay_action_exit();
1125     xbt_free(sendbuffer);
1126     xbt_free(recvbuffer);
1127     //xbt_free(reqq);
1128     xbt_dict_free(&reqq); //not need, data have been freed ???
1129     reqq = NULL;
1130   }
1131   
1132
1133   int rank = smpi_process_index();
1134   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1135   extra->type = TRACING_FINALIZE;
1136   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1137
1138   smpi_process_finalize();
1139
1140   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1141   TRACE_smpi_finalize(smpi_process_index());
1142   smpi_process_destroy();
1143   return MPI_SUCCESS;
1144 }