Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[travis] detect linux as we should
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     free(name);
33   }
34 }
35
36
37 static xbt_dynar_t get_reqq_self(){
38    char * key;
39    
40    int size = asprintf(&key, "%d", smpi_process_index());
41    if(size==-1)
42      xbt_die("could not allocate memory for asprintf");
43    xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
44    free(key);
45  
46    return dynar_mpi_request;
47 }
48
49 static void set_reqq_self(xbt_dynar_t mpi_request){
50    char * key;
51    
52    int size = asprintf(&key, "%d", smpi_process_index());
53    if(size==-1)
54      xbt_die("could not allocate memory for asprintf");
55    xbt_dict_set(reqq, key, mpi_request, free);
56    free(key);
57 }
58
59
60 //allocate a single buffer for all sends, growing it if needed
61 void* smpi_get_tmp_sendbuffer(int size){
62   if (!smpi_process_get_replaying())
63         return xbt_malloc(size);
64   if (sendbuffer_size<size){
65     sendbuffer=xbt_realloc(sendbuffer,size);
66     sendbuffer_size=size;
67   }
68   return sendbuffer;
69 }
70 //allocate a single buffer for all recv
71 void* smpi_get_tmp_recvbuffer(int size){
72   if (!smpi_process_get_replaying())
73         return xbt_malloc(size);
74   if (recvbuffer_size<size){
75     recvbuffer=xbt_realloc(recvbuffer,size);
76     recvbuffer_size=size;
77   }
78   return sendbuffer;
79 }
80
81 void smpi_free_tmp_buffer(void* buf){
82   if (!smpi_process_get_replaying())
83     xbt_free(buf);
84 }
85
86 /* Helper function */
87 static double parse_double(const char *string)
88 {
89   double value;
90   char *endptr;
91   value = strtod(string, &endptr);
92   if (*endptr != '\0')
93     THROWF(unknown_error, 0, "%s is not a double", string);
94   return value;
95 }
96
97 static MPI_Datatype decode_datatype(const char *const action)
98 {
99 // Declared datatypes,
100
101   switch(atoi(action))
102   {
103     case 0:
104       MPI_CURRENT_TYPE=MPI_DOUBLE;
105       break;
106     case 1:
107       MPI_CURRENT_TYPE=MPI_INT;
108       break;
109     case 2:
110       MPI_CURRENT_TYPE=MPI_CHAR;
111       break;
112     case 3:
113       MPI_CURRENT_TYPE=MPI_SHORT;
114       break;
115     case 4:
116       MPI_CURRENT_TYPE=MPI_LONG;
117       break;
118     case 5:
119       MPI_CURRENT_TYPE=MPI_FLOAT;
120       break;
121     case 6:
122       MPI_CURRENT_TYPE=MPI_BYTE;
123       break;
124     default:
125       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
126
127   }
128    return MPI_CURRENT_TYPE;
129 }
130
131
132 const char* encode_datatype(MPI_Datatype datatype, int* known)
133 {
134
135   //default type for output is set to MPI_BYTE
136   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
137   if(known)*known=1;
138   if (datatype==MPI_BYTE){
139       return "";
140   }
141   if(datatype==MPI_DOUBLE)
142       return "0";
143   if(datatype==MPI_INT)
144       return "1";
145   if(datatype==MPI_CHAR)
146       return "2";
147   if(datatype==MPI_SHORT)
148       return "3";
149   if(datatype==MPI_LONG)
150     return "4";
151   if(datatype==MPI_FLOAT)
152       return "5";
153   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
154   if(known)*known=0;
155   // default - not implemented.
156   // do not warn here as we pass in this function even for other trace formats
157   return "-1";
158 }
159
160 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
161     int i=0;\
162     while(action[i]!=NULL)\
163      i++;\
164     if(i<mandatory+2)                                           \
165     THROWF(arg_error, 0, "%s replay failed.\n" \
166           "%d items were given on the line. First two should be process_id and action.  " \
167           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
168           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
169   }
170
171
172 static void action_init(const char *const *action)
173 {
174   XBT_DEBUG("Initialize the counters");
175   CHECK_ACTION_PARAMS(action, 0, 1);
176   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
177   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
178
179   /* start a simulated timer */
180   smpi_process_simulated_start();
181   /*initialize the number of active processes */
182   active_processes = smpi_process_count();
183
184   if (!reqq) {
185     reqq = xbt_dict_new();
186   }
187
188   set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
189
190   /*
191     reqq=xbt_new0(xbt_dynar_t,active_processes);
192
193     for(i=0;i<active_processes;i++){
194       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
195     }
196   }
197   */
198 }
199
200 static void action_finalize(const char *const *action)
201 {
202 }
203
204 static void action_comm_size(const char *const *action)
205 {
206   double clock = smpi_process_simulated_elapsed();
207
208   communicator_size = parse_double(action[2]);
209   log_timed_action (action, clock);
210 }
211
212 static void action_comm_split(const char *const *action)
213 {
214   double clock = smpi_process_simulated_elapsed();
215
216   log_timed_action (action, clock);
217 }
218
219 static void action_comm_dup(const char *const *action)
220 {
221   double clock = smpi_process_simulated_elapsed();
222
223   log_timed_action (action, clock);
224 }
225
226 static void action_compute(const char *const *action)
227 {
228   CHECK_ACTION_PARAMS(action, 1, 0);
229   double clock = smpi_process_simulated_elapsed();
230   double flops= parse_double(action[2]);
231   int rank = smpi_process_index();
232   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233   extra->type=TRACING_COMPUTING;
234   extra->comp_size=flops;
235   TRACE_smpi_computing_in(rank, extra);
236
237   smpi_execute_flops(flops);
238
239   TRACE_smpi_computing_out(rank);
240   log_timed_action (action, clock);
241 }
242
243 static void action_send(const char *const *action)
244 {
245   CHECK_ACTION_PARAMS(action, 2, 1);
246   int to = atoi(action[2]);
247   double size=parse_double(action[3]);
248   double clock = smpi_process_simulated_elapsed();
249
250   if(action[4]) {
251     MPI_CURRENT_TYPE=decode_datatype(action[4]);
252   } else {
253     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
254   }
255
256   int rank = smpi_process_index();
257
258   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
259   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
260   extra->type = TRACING_SEND;
261   extra->send_size = size;
262   extra->src = rank;
263   extra->dst = dst_traced;
264   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
265   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
266   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
267
268   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
269
270   log_timed_action (action, clock);
271
272   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
273 }
274
275 static void action_Isend(const char *const *action)
276 {
277   CHECK_ACTION_PARAMS(action, 2, 1);
278   int to = atoi(action[2]);
279   double size=parse_double(action[3]);
280   double clock = smpi_process_simulated_elapsed();
281   MPI_Request request;
282
283   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
284   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285
286   int rank = smpi_process_index();
287   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
288   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289   extra->type = TRACING_ISEND;
290   extra->send_size = size;
291   extra->src = rank;
292   extra->dst = dst_traced;
293   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
294   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
295   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
296
297   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
298
299   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
300   request->send = 1;
301
302   xbt_dynar_push(get_reqq_self(),&request);
303
304   log_timed_action (action, clock);
305 }
306
307 static void action_recv(const char *const *action) {
308   CHECK_ACTION_PARAMS(action, 2, 1);
309   int from = atoi(action[2]);
310   double size=parse_double(action[3]);
311   double clock = smpi_process_simulated_elapsed();
312   MPI_Status status;
313
314   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
315   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
316
317   int rank = smpi_process_index();
318   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
319
320   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
321   extra->type = TRACING_RECV;
322   extra->send_size = size;
323   extra->src = src_traced;
324   extra->dst = rank;
325   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
326   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
327
328   //unknow size from the receiver pov
329   if(size==-1){
330       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
331       size=status.count;
332   }
333
334   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
335
336   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
337   TRACE_smpi_recv(rank, src_traced, rank);
338
339   log_timed_action (action, clock);
340 }
341
342 static void action_Irecv(const char *const *action)
343 {
344   CHECK_ACTION_PARAMS(action, 2, 1);
345   int from = atoi(action[2]);
346   double size=parse_double(action[3]);
347   double clock = smpi_process_simulated_elapsed();
348   MPI_Request request;
349
350   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
351   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
352
353   int rank = smpi_process_index();
354   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
355   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
356   extra->type = TRACING_IRECV;
357   extra->send_size = size;
358   extra->src = src_traced;
359   extra->dst = rank;
360   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
361   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
362   MPI_Status status;
363   //unknow size from the receiver pov
364   if(size==-1){
365       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
366       size=status.count;
367   }
368
369   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
370
371   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
372   request->recv = 1;
373   xbt_dynar_push(get_reqq_self(),&request);
374
375   log_timed_action (action, clock);
376 }
377
378 static void action_test(const char *const *action){
379   CHECK_ACTION_PARAMS(action, 0, 0);
380   double clock = smpi_process_simulated_elapsed();
381   MPI_Request request;
382   MPI_Status status;
383   int flag = TRUE;
384
385   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
386   //if request is null here, this may mean that a previous test has succeeded 
387   //Different times in traced application and replayed version may lead to this 
388   //In this case, ignore the extra calls.
389   if(request){
390           int rank = smpi_process_index();
391           instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
392           extra->type=TRACING_TEST;
393           TRACE_smpi_testing_in(rank, extra);
394
395           flag = smpi_mpi_test(&request, &status);
396
397           XBT_DEBUG("MPI_Test result: %d", flag);
398           /* push back request in dynar to be caught by a subsequent wait. if the test
399            * did succeed, the request is now NULL.
400            */
401           xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
402
403           TRACE_smpi_testing_out(rank);
404   }
405   log_timed_action (action, clock);
406 }
407
408 static void action_wait(const char *const *action){
409   CHECK_ACTION_PARAMS(action, 0, 0);
410   double clock = smpi_process_simulated_elapsed();
411   MPI_Request request;
412   MPI_Status status;
413
414   xbt_assert(xbt_dynar_length(get_reqq_self()),
415       "action wait not preceded by any irecv or isend: %s",
416       xbt_str_join_array(action," "));
417   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
418
419   if (!request){
420     /* Assuming that the trace is well formed, this mean the comm might have
421      * been caught by a MPI_test. Then just return.
422      */
423     return;
424   }
425
426   int rank = request->comm != MPI_COMM_NULL
427       ? smpi_comm_rank(request->comm)
428       : -1;
429
430   MPI_Group group = smpi_comm_group(request->comm);
431   int src_traced = smpi_group_rank(group, request->src);
432   int dst_traced = smpi_group_rank(group, request->dst);
433   int is_wait_for_receive = request->recv;
434   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
435   extra->type = TRACING_WAIT;
436   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
437
438   smpi_mpi_wait(&request, &status);
439
440   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
441   if (is_wait_for_receive)
442     TRACE_smpi_recv(rank, src_traced, dst_traced);
443   log_timed_action (action, clock);
444 }
445
446 static void action_waitall(const char *const *action){
447   CHECK_ACTION_PARAMS(action, 0, 0);
448   double clock = smpi_process_simulated_elapsed();
449   int count_requests=0;
450   unsigned int i=0;
451
452   count_requests=xbt_dynar_length(get_reqq_self());
453
454   if (count_requests>0) {
455     MPI_Request requests[count_requests];
456     MPI_Status status[count_requests];
457
458     /*  The reqq is an array of dynars. Its index corresponds to the rank.
459      Thus each rank saves its own requests to the array request. */
460     xbt_dynar_foreach(get_reqq_self(),i,requests[i]); 
461
462    //save information from requests
463
464    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
465    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
466    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
467    for (i = 0; i < count_requests; i++) {
468     if(requests[i]){
469       int *asrc = xbt_new(int, 1);
470       int *adst = xbt_new(int, 1);
471       int *arecv = xbt_new(int, 1);
472       *asrc = requests[i]->src;
473       *adst = requests[i]->dst;
474       *arecv = requests[i]->recv;
475       xbt_dynar_insert_at(srcs, i, asrc);
476       xbt_dynar_insert_at(dsts, i, adst);
477       xbt_dynar_insert_at(recvs, i, arecv);
478       xbt_free(asrc);
479       xbt_free(adst);
480       xbt_free(arecv);
481     }else {
482       int *t = xbt_new(int, 1);
483       xbt_dynar_insert_at(srcs, i, t);
484       xbt_dynar_insert_at(dsts, i, t);
485       xbt_dynar_insert_at(recvs, i, t);
486       xbt_free(t);
487     }
488    }
489    int rank_traced = smpi_process_index();
490    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
491    extra->type = TRACING_WAITALL;
492    extra->send_size=count_requests;
493    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
494
495    smpi_mpi_waitall(count_requests, requests, status);
496
497    for (i = 0; i < count_requests; i++) {
498     int src_traced, dst_traced, is_wait_for_receive;
499     xbt_dynar_get_cpy(srcs, i, &src_traced);
500     xbt_dynar_get_cpy(dsts, i, &dst_traced);
501     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
502     if (is_wait_for_receive) {
503       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
504     }
505    }
506    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
507    //clean-up of dynars
508    xbt_dynar_free(&srcs);
509    xbt_dynar_free(&dsts);
510    xbt_dynar_free(&recvs);
511
512    //TODO xbt_dynar_free_container(get_reqq_self());
513    set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
514   }
515   log_timed_action (action, clock);
516 }
517
518 static void action_barrier(const char *const *action){
519   double clock = smpi_process_simulated_elapsed();
520   int rank = smpi_process_index();
521   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
522   extra->type = TRACING_BARRIER;
523   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
524
525   mpi_coll_barrier_fun(MPI_COMM_WORLD);
526
527   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
528   log_timed_action (action, clock);
529 }
530
531
532 static void action_bcast(const char *const *action)
533 {
534   CHECK_ACTION_PARAMS(action, 1, 2);
535   double size = parse_double(action[2]);
536   double clock = smpi_process_simulated_elapsed();
537   int root=0;
538   /*
539    * Initialize MPI_CURRENT_TYPE in order to decrease
540    * the number of the checks
541    * */
542   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
543
544   if(action[3]) {
545     root= atoi(action[3]);
546     if(action[4]) {
547       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
548     }
549   }
550
551   int rank = smpi_process_index();
552   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
553
554   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
555   extra->type = TRACING_BCAST;
556   extra->send_size = size;
557   extra->root = root_traced;
558   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
559   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
560   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
561
562   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
563
564   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
565   log_timed_action (action, clock);
566 }
567
568 static void action_reduce(const char *const *action)
569 {
570   CHECK_ACTION_PARAMS(action, 2, 2);
571   double comm_size = parse_double(action[2]);
572   double comp_size = parse_double(action[3]);
573   double clock = smpi_process_simulated_elapsed();
574   int root=0;
575   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
576
577   if(action[4]) {
578     root= atoi(action[4]);
579     if(action[5]) {
580       MPI_CURRENT_TYPE=decode_datatype(action[5]);
581     }
582   }
583   
584   
585
586   int rank = smpi_process_index();
587   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
588   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
589   extra->type = TRACING_REDUCE;
590   extra->send_size = comm_size;
591   extra->comp_size = comp_size;
592   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
593   extra->root = root_traced;
594
595   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
596
597   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
598   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
599    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
600    smpi_execute_flops(comp_size);
601
602   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
603   log_timed_action (action, clock);
604 }
605
606 static void action_allReduce(const char *const *action) {
607   CHECK_ACTION_PARAMS(action, 2, 1);
608   double comm_size = parse_double(action[2]);
609   double comp_size = parse_double(action[3]);
610
611   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
612   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
613
614   double clock = smpi_process_simulated_elapsed();
615   int rank = smpi_process_index();
616   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
617   extra->type = TRACING_ALLREDUCE;
618   extra->send_size = comm_size;
619   extra->comp_size = comp_size;
620   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
621   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
622
623   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
625   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
626   smpi_execute_flops(comp_size);
627
628   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
629   log_timed_action (action, clock);
630 }
631
632 static void action_allToAll(const char *const *action) {
633   double clock = smpi_process_simulated_elapsed();
634   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
635   int send_size = parse_double(action[2]);
636   int recv_size = parse_double(action[3]);
637   MPI_Datatype MPI_CURRENT_TYPE2;
638
639   if(action[4]) {
640     MPI_CURRENT_TYPE=decode_datatype(action[4]);
641     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
642   }
643   else {
644     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
645     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
646   }
647   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
648   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
649
650   int rank = smpi_process_index();
651   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
652   extra->type = TRACING_ALLTOALL;
653   extra->send_size = send_size;
654   extra->recv_size = recv_size;
655   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
656   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
657
658   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
659
660   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
661
662   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
663   log_timed_action (action, clock);
664 }
665
666
667 static void action_gather(const char *const *action) {
668   /*
669  The structure of the gather action for the rank 0 (total 4 processes) 
670  is the following:   
671  0 gather 68 68 0 0 0
672
673   where: 
674   1) 68 is the sendcounts
675   2) 68 is the recvcounts
676   3) 0 is the root node
677   4) 0 is the send datatype id, see decode_datatype()
678   5) 0 is the recv datatype id, see decode_datatype()
679   */
680   CHECK_ACTION_PARAMS(action, 2, 3);
681   double clock = smpi_process_simulated_elapsed();
682   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
683   int send_size = parse_double(action[2]);
684   int recv_size = parse_double(action[3]);
685   MPI_Datatype MPI_CURRENT_TYPE2;
686   if(action[5]) {
687     MPI_CURRENT_TYPE=decode_datatype(action[5]);
688     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
689   } else {
690     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
691     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
692   }
693   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
694   void *recv = NULL;
695   int root=0;
696   if(action[4])
697     root=atoi(action[4]);
698   int rank = smpi_comm_rank(MPI_COMM_WORLD);
699
700   if(rank==root)
701     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
702
703   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
704   extra->type = TRACING_GATHER;
705   extra->send_size = send_size;
706   extra->recv_size = recv_size;
707   extra->root = root;
708   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
709   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
710
711   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
712
713   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
714                 recv, recv_size, MPI_CURRENT_TYPE2,
715                 root, MPI_COMM_WORLD);
716
717   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
718   log_timed_action (action, clock);
719 }
720
721
722
723 static void action_gatherv(const char *const *action) {
724   /*
725  The structure of the gatherv action for the rank 0 (total 4 processes)
726  is the following:
727  0 gather 68 68 10 10 10 0 0 0
728
729   where:
730   1) 68 is the sendcount
731   2) 68 10 10 10 is the recvcounts
732   3) 0 is the root node
733   4) 0 is the send datatype id, see decode_datatype()
734   5) 0 is the recv datatype id, see decode_datatype()
735   */
736
737   double clock = smpi_process_simulated_elapsed();
738   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
739   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
740   int send_size = parse_double(action[2]);
741   int *disps = xbt_new0(int, comm_size);
742   int *recvcounts = xbt_new0(int, comm_size);
743   int i=0,recv_sum=0;
744
745   MPI_Datatype MPI_CURRENT_TYPE2;
746   if(action[4+comm_size]) {
747     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
748     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
749   } else {
750     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
751     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
752   }
753   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
754   void *recv = NULL;
755   for(i=0;i<comm_size;i++) {
756     recvcounts[i] = atoi(action[i+3]);
757     recv_sum=recv_sum+recvcounts[i];
758     disps[i] = 0;
759   }
760
761   int root=atoi(action[3+comm_size]);
762   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
763
764   if(rank==root)
765     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
766
767   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
768   extra->type = TRACING_GATHERV;
769   extra->send_size = send_size;
770   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
771   for(i=0; i< comm_size; i++)//copy data to avoid bad free
772     extra->recvcounts[i] = recvcounts[i];
773   extra->root = root;
774   extra->num_processes = comm_size;
775   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
776   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
777
778   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
779
780   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
781                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
782                 root, MPI_COMM_WORLD);
783
784   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
785   log_timed_action (action, clock);
786   xbt_free(recvcounts);
787   xbt_free(disps);
788 }
789
790 static void action_reducescatter(const char *const *action) {
791
792     /*
793  The structure of the reducescatter action for the rank 0 (total 4 processes) 
794  is the following:   
795 0 reduceScatter 275427 275427 275427 204020 11346849 0
796
797   where: 
798   1) The first four values after the name of the action declare the recvcounts array
799   2) The value 11346849 is the amount of instructions
800   3) The last value corresponds to the datatype, see decode_datatype().
801
802   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
803
804    */
805
806   double clock = smpi_process_simulated_elapsed();
807   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
808   CHECK_ACTION_PARAMS(action, comm_size+1, 1);
809   int comp_size = parse_double(action[2+comm_size]);
810   int *recvcounts = xbt_new0(int, comm_size);  
811   int *disps = xbt_new0(int, comm_size);  
812   int i=0;
813   int rank = smpi_process_index();
814   int size = 0;
815   if(action[3+comm_size])
816     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
817   else
818     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
819
820   for(i=0;i<comm_size;i++) {
821     recvcounts[i] = atoi(action[i+2]);
822     disps[i] = 0;
823     size+=recvcounts[i];
824   }
825
826   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
827   extra->type = TRACING_REDUCE_SCATTER;
828   extra->send_size = 0;
829   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
830   for(i=0; i< comm_size; i++)//copy data to avoid bad free
831     extra->recvcounts[i] = recvcounts[i];
832   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
833   extra->comp_size = comp_size;
834   extra->num_processes = comm_size;
835
836   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
837
838   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
839   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
840    
841    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
842        MPI_COMM_WORLD);
843    smpi_execute_flops(comp_size);
844
845
846   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
847   xbt_free(recvcounts);
848   xbt_free(disps);
849   log_timed_action (action, clock);
850 }
851
852 static void action_allgather(const char *const *action) {
853   /*
854  The structure of the allgather action for the rank 0 (total 4 processes) 
855  is the following:   
856   0 allGather 275427 275427
857
858   where: 
859   1) 275427 is the sendcount
860   2) 275427 is the recvcount
861   3) No more values mean that the datatype for sent and receive buffer
862   is the default one, see decode_datatype().
863
864    */
865
866   double clock = smpi_process_simulated_elapsed();
867
868   CHECK_ACTION_PARAMS(action, 2, 2);
869   int sendcount=atoi(action[2]); 
870   int recvcount=atoi(action[3]); 
871
872   MPI_Datatype MPI_CURRENT_TYPE2;
873
874   if(action[4]) {
875     MPI_CURRENT_TYPE = decode_datatype(action[3]);
876     MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
877   } else {
878     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
879     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
880   }
881   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
882   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
883
884   int rank = smpi_process_index();
885   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
886   extra->type = TRACING_ALLGATHER;
887   extra->send_size = sendcount;
888   extra->recv_size= recvcount;
889   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
890   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
891   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
892
893   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
894
895   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
896
897   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
898   log_timed_action (action, clock);
899 }
900
901 static void action_allgatherv(const char *const *action) {
902
903   /*
904  The structure of the allgatherv action for the rank 0 (total 4 processes) 
905  is the following:   
906 0 allGatherV 275427 275427 275427 275427 204020
907
908   where: 
909   1) 275427 is the sendcount
910   2) The next four elements declare the recvcounts array
911   3) No more values mean that the datatype for sent and receive buffer
912   is the default one, see decode_datatype().
913
914    */
915
916   double clock = smpi_process_simulated_elapsed();
917
918   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
919   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
920   int i=0;
921   int sendcount=atoi(action[2]);
922   int *recvcounts = xbt_new0(int, comm_size);  
923   int *disps = xbt_new0(int, comm_size);  
924   int recv_sum=0;  
925   MPI_Datatype MPI_CURRENT_TYPE2;
926
927   if(action[3+comm_size]) {
928     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
929     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
930   } else {
931     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
932     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
933   }
934   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
935
936   for(i=0;i<comm_size;i++) {
937     recvcounts[i] = atoi(action[i+3]);
938     recv_sum=recv_sum+recvcounts[i];
939   }
940   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
941
942   int rank = smpi_process_index();
943   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
944   extra->type = TRACING_ALLGATHERV;
945   extra->send_size = sendcount;
946   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
947   for(i=0; i< comm_size; i++)//copy data to avoid bad free
948     extra->recvcounts[i] = recvcounts[i];
949   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
950   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
951   extra->num_processes = comm_size;
952
953   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
954
955   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
956
957   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
958   log_timed_action (action, clock);
959   xbt_free(recvcounts);
960   xbt_free(disps);
961 }
962
963 static void action_allToAllv(const char *const *action) {
964   /*
965  The structure of the allToAllV action for the rank 0 (total 4 processes) 
966  is the following:   
967   0 allToAllV 100 1 7 10 12 100 1 70 10 5
968
969   where: 
970   1) 100 is the size of the send buffer *sizeof(int),
971   2) 1 7 10 12 is the sendcounts array
972   3) 100*sizeof(int) is the size of the receiver buffer
973   4)  1 70 10 5 is the recvcounts array
974
975    */
976
977
978   double clock = smpi_process_simulated_elapsed();
979
980   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
981   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
982   int send_buf_size=0,recv_buf_size=0,i=0;
983   int *sendcounts = xbt_new0(int, comm_size);  
984   int *recvcounts = xbt_new0(int, comm_size);  
985   int *senddisps = xbt_new0(int, comm_size);  
986   int *recvdisps = xbt_new0(int, comm_size);  
987
988   MPI_Datatype MPI_CURRENT_TYPE2;
989
990   send_buf_size=parse_double(action[2]);
991   recv_buf_size=parse_double(action[3+comm_size]);
992   if(action[4+2*comm_size]) {
993     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
994     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
995   }
996   else {
997       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
998       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
999   }
1000
1001   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1002   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1003
1004   for(i=0;i<comm_size;i++) {
1005     sendcounts[i] = atoi(action[i+3]);
1006     recvcounts[i] = atoi(action[i+4+comm_size]);
1007   }
1008
1009
1010   int rank = smpi_process_index();
1011   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1012   extra->type = TRACING_ALLTOALLV;
1013   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1014   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1015   extra->num_processes = comm_size;
1016
1017   for(i=0; i< comm_size; i++){//copy data to avoid bad free
1018     extra->send_size += sendcounts[i];
1019     extra->sendcounts[i] = sendcounts[i];
1020     extra->recv_size += recvcounts[i];
1021     extra->recvcounts[i] = recvcounts[i];
1022   }
1023   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1024   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1025
1026   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1027
1028   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1029                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1030                                MPI_COMM_WORLD);
1031
1032   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1033   log_timed_action (action, clock);
1034   xbt_free(sendcounts);
1035   xbt_free(recvcounts);
1036   xbt_free(senddisps);
1037   xbt_free(recvdisps);
1038 }
1039
1040 void smpi_replay_run(int *argc, char***argv){
1041   /* First initializes everything */
1042   smpi_process_init(argc, argv);
1043   smpi_process_mark_as_initialized();
1044   smpi_process_set_replaying(1);
1045
1046   int rank = smpi_process_index();
1047   TRACE_smpi_init(rank);
1048   TRACE_smpi_computing_init(rank);
1049   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1050   extra->type = TRACING_INIT;
1051   char *operation =bprintf("%s_init",__FUNCTION__);
1052   TRACE_smpi_collective_in(rank, -1, operation, extra);
1053   TRACE_smpi_collective_out(rank, -1, operation);
1054   free(operation);
1055
1056   if (!_xbt_replay_action_init()) {
1057     xbt_replay_action_register("init",       action_init);
1058     xbt_replay_action_register("finalize",   action_finalize);
1059     xbt_replay_action_register("comm_size",  action_comm_size);
1060     xbt_replay_action_register("comm_split", action_comm_split);
1061     xbt_replay_action_register("comm_dup",   action_comm_dup);
1062     xbt_replay_action_register("send",       action_send);
1063     xbt_replay_action_register("Isend",      action_Isend);
1064     xbt_replay_action_register("recv",       action_recv);
1065     xbt_replay_action_register("Irecv",      action_Irecv);
1066     xbt_replay_action_register("test",       action_test);
1067     xbt_replay_action_register("wait",       action_wait);
1068     xbt_replay_action_register("waitAll",    action_waitall);
1069     xbt_replay_action_register("barrier",    action_barrier);
1070     xbt_replay_action_register("bcast",      action_bcast);
1071     xbt_replay_action_register("reduce",     action_reduce);
1072     xbt_replay_action_register("allReduce",  action_allReduce);
1073     xbt_replay_action_register("allToAll",   action_allToAll);
1074     xbt_replay_action_register("allToAllV",  action_allToAllv);
1075     xbt_replay_action_register("gather",  action_gather);
1076     xbt_replay_action_register("gatherV",  action_gatherv);
1077     xbt_replay_action_register("allGather",  action_allgather);
1078     xbt_replay_action_register("allGatherV",  action_allgatherv);
1079     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1080     xbt_replay_action_register("compute",    action_compute);
1081   }
1082   
1083   //if we have a delayed start, sleep here.
1084   if(*argc>2){
1085     char *endptr;
1086     double value = strtod((*argv)[2], &endptr);
1087     if (*endptr != '\0')
1088       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1089     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1090     smpi_execute_flops(value);
1091   } else {
1092     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1093     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
1094     smpi_execute_flops(0.0);
1095   }
1096   
1097   /* Actually run the replay */
1098   xbt_replay_action_runner(*argc, *argv);
1099
1100   /* and now, finalize everything */
1101   double sim_time= 1.;
1102   /* One active process will stop. Decrease the counter*/
1103   XBT_DEBUG("There are %lu elements in reqq[*]",
1104             xbt_dynar_length(get_reqq_self()));
1105   if (!xbt_dynar_is_empty(get_reqq_self())){
1106     int count_requests=xbt_dynar_length(get_reqq_self());
1107     MPI_Request requests[count_requests];
1108     MPI_Status status[count_requests];
1109     unsigned int i;
1110
1111     xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1112     smpi_mpi_waitall(count_requests, requests, status);
1113     active_processes--;
1114   } else {
1115     active_processes--;
1116   }
1117
1118   if(!active_processes){
1119     /* Last process alive speaking */
1120     /* end the simulated timer */
1121     sim_time = smpi_process_simulated_elapsed();
1122   }
1123   
1124
1125   //TODO xbt_dynar_free_container(get_reqq_self()));
1126
1127   if(!active_processes){
1128     XBT_INFO("Simulation time %f", sim_time);
1129     _xbt_replay_action_exit();
1130     xbt_free(sendbuffer);
1131     xbt_free(recvbuffer);
1132     //xbt_free(reqq);
1133     xbt_dict_free(&reqq); //not need, data have been freed ???
1134     reqq = NULL;
1135   }
1136   
1137   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1138   extra_fin->type = TRACING_FINALIZE;
1139   operation =bprintf("%s_finalize",__FUNCTION__);
1140   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1141
1142   smpi_process_finalize();
1143
1144   TRACE_smpi_collective_out(rank, -1, operation);
1145   TRACE_smpi_finalize(smpi_process_index());
1146   smpi_process_destroy();
1147   free(operation);
1148 }