Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI/INSTR] Header cleanup.
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
19
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
22
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
27
28 static void log_timed_action (const char *const *action, double clock){
29   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30     char *name = xbt_str_join_array(action, " ");
31     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
32     free(name);
33   }
34 }
35
36 static xbt_dynar_t get_reqq_self()
37 {
38    char * key = bprintf("%d", smpi_process_index());
39    xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
40    free(key);
41  
42    return dynar_mpi_request;
43 }
44
45 static void set_reqq_self(xbt_dynar_t mpi_request)
46 {
47    char * key = bprintf("%d", smpi_process_index());
48    xbt_dict_set(reqq, key, mpi_request, free);
49    free(key);
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (!smpi_process_get_replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (!smpi_process_get_replaying())
67   return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (!smpi_process_get_replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   double value;
84   char *endptr;
85   value = strtod(string, &endptr);
86   if (*endptr != '\0')
87     THROWF(unknown_error, 0, "%s is not a double", string);
88   return value;
89 }
90
91 static MPI_Datatype decode_datatype(const char *const action)
92 {
93 // Declared datatypes,
94   switch(atoi(action)) {
95     case 0:
96       MPI_CURRENT_TYPE=MPI_DOUBLE;
97       break;
98     case 1:
99       MPI_CURRENT_TYPE=MPI_INT;
100       break;
101     case 2:
102       MPI_CURRENT_TYPE=MPI_CHAR;
103       break;
104     case 3:
105       MPI_CURRENT_TYPE=MPI_SHORT;
106       break;
107     case 4:
108       MPI_CURRENT_TYPE=MPI_LONG;
109       break;
110     case 5:
111       MPI_CURRENT_TYPE=MPI_FLOAT;
112       break;
113     case 6:
114       MPI_CURRENT_TYPE=MPI_BYTE;
115       break;
116     default:
117       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118   }
119    return MPI_CURRENT_TYPE;
120 }
121
122
123 const char* encode_datatype(MPI_Datatype datatype, int* known)
124 {
125   //default type for output is set to MPI_BYTE
126   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
127   if(known)*known=1;
128   if (datatype==MPI_BYTE){
129       return "";
130   }
131   if(datatype==MPI_DOUBLE)
132       return "0";
133   if(datatype==MPI_INT)
134       return "1";
135   if(datatype==MPI_CHAR)
136       return "2";
137   if(datatype==MPI_SHORT)
138       return "3";
139   if(datatype==MPI_LONG)
140     return "4";
141   if(datatype==MPI_FLOAT)
142       return "5";
143   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
144   if(known)*known=0;
145   // default - not implemented.
146   // do not warn here as we pass in this function even for other trace formats
147   return "-1";
148 }
149
150 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
151     int i=0;\
152     while(action[i]!=NULL)\
153      i++;\
154     if(i<mandatory+2)                                           \
155     THROWF(arg_error, 0, "%s replay failed.\n" \
156           "%d items were given on the line. First two should be process_id and action.  " \
157           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
158           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159   }
160
161 static void action_init(const char *const *action)
162 {
163   XBT_DEBUG("Initialize the counters");
164   CHECK_ACTION_PARAMS(action, 0, 1);
165   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
166   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
167
168   /* start a simulated timer */
169   smpi_process_simulated_start();
170   /*initialize the number of active processes */
171   active_processes = smpi_process_count();
172
173   if (!reqq) {
174     reqq = xbt_dict_new();
175   }
176
177   set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
178 }
179
180 static void action_finalize(const char *const *action)
181 {
182 }
183
184 static void action_comm_size(const char *const *action)
185 {
186   double clock = smpi_process_simulated_elapsed();
187
188   communicator_size = parse_double(action[2]);
189   log_timed_action (action, clock);
190 }
191
192 static void action_comm_split(const char *const *action)
193 {
194   double clock = smpi_process_simulated_elapsed();
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_comm_dup(const char *const *action)
200 {
201   double clock = smpi_process_simulated_elapsed();
202
203   log_timed_action (action, clock);
204 }
205
206 static void action_compute(const char *const *action)
207 {
208   CHECK_ACTION_PARAMS(action, 1, 0);
209   double clock = smpi_process_simulated_elapsed();
210   double flops= parse_double(action[2]);
211   int rank = smpi_process_index();
212   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
213   extra->type=TRACING_COMPUTING;
214   extra->comp_size=flops;
215   TRACE_smpi_computing_in(rank, extra);
216
217   smpi_execute_flops(flops);
218
219   TRACE_smpi_computing_out(rank);
220   log_timed_action (action, clock);
221 }
222
223 static void action_send(const char *const *action)
224 {
225   CHECK_ACTION_PARAMS(action, 2, 1);
226   int to = atoi(action[2]);
227   double size=parse_double(action[3]);
228   double clock = smpi_process_simulated_elapsed();
229
230   if(action[4]) {
231     MPI_CURRENT_TYPE=decode_datatype(action[4]);
232   } else {
233     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
234   }
235
236   int rank = smpi_process_index();
237
238   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
239   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
240   extra->type = TRACING_SEND;
241   extra->send_size = size;
242   extra->src = rank;
243   extra->dst = dst_traced;
244   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
245   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
246   if (!TRACE_smpi_view_internals()) {
247     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
248   }
249
250   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
251
252   log_timed_action (action, clock);
253
254   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
255 }
256
257 static void action_Isend(const char *const *action)
258 {
259   CHECK_ACTION_PARAMS(action, 2, 1);
260   int to = atoi(action[2]);
261   double size=parse_double(action[3]);
262   double clock = smpi_process_simulated_elapsed();
263   MPI_Request request;
264
265   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
266   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
267
268   int rank = smpi_process_index();
269   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
270   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
271   extra->type = TRACING_ISEND;
272   extra->send_size = size;
273   extra->src = rank;
274   extra->dst = dst_traced;
275   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
276   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
277   if (!TRACE_smpi_view_internals()) {
278     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
279   }
280
281   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
282
283   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
284   request->send = 1;
285
286   xbt_dynar_push(get_reqq_self(),&request);
287
288   log_timed_action (action, clock);
289 }
290
291 static void action_recv(const char *const *action) {
292   CHECK_ACTION_PARAMS(action, 2, 1);
293   int from = atoi(action[2]);
294   double size=parse_double(action[3]);
295   double clock = smpi_process_simulated_elapsed();
296   MPI_Status status;
297
298   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
299   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
300
301   int rank = smpi_process_index();
302   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
303
304   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
305   extra->type = TRACING_RECV;
306   extra->send_size = size;
307   extra->src = src_traced;
308   extra->dst = rank;
309   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
310   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
311
312   //unknow size from the receiver pov
313   if(size==-1){
314       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
315       size=status.count;
316   }
317
318   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
319
320   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
321   if (!TRACE_smpi_view_internals()) {
322     TRACE_smpi_recv(rank, src_traced, rank);
323   }
324
325   log_timed_action (action, clock);
326 }
327
328 static void action_Irecv(const char *const *action)
329 {
330   CHECK_ACTION_PARAMS(action, 2, 1);
331   int from = atoi(action[2]);
332   double size=parse_double(action[3]);
333   double clock = smpi_process_simulated_elapsed();
334   MPI_Request request;
335
336   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
337   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
338
339   int rank = smpi_process_index();
340   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
341   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
342   extra->type = TRACING_IRECV;
343   extra->send_size = size;
344   extra->src = src_traced;
345   extra->dst = rank;
346   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
347   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
348   MPI_Status status;
349   //unknow size from the receiver pov
350   if(size==-1){
351       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
352       size=status.count;
353   }
354
355   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
356
357   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
358   request->recv = 1;
359   xbt_dynar_push(get_reqq_self(),&request);
360
361   log_timed_action (action, clock);
362 }
363
364 static void action_test(const char *const *action){
365   CHECK_ACTION_PARAMS(action, 0, 0);
366   double clock = smpi_process_simulated_elapsed();
367   MPI_Request request;
368   MPI_Status status;
369   int flag = true;
370
371   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
372   //if request is null here, this may mean that a previous test has succeeded 
373   //Different times in traced application and replayed version may lead to this 
374   //In this case, ignore the extra calls.
375   if(request){
376     int rank = smpi_process_index();
377     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
378     extra->type=TRACING_TEST;
379     TRACE_smpi_testing_in(rank, extra);
380
381     flag = smpi_mpi_test(&request, &status);
382
383     XBT_DEBUG("MPI_Test result: %d", flag);
384     /* push back request in dynar to be caught by a subsequent wait. if the test did succeed, the request is now NULL.*/
385     xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
386
387     TRACE_smpi_testing_out(rank);
388   }
389   log_timed_action (action, clock);
390 }
391
392 static void action_wait(const char *const *action){
393   CHECK_ACTION_PARAMS(action, 0, 0);
394   double clock = smpi_process_simulated_elapsed();
395   MPI_Request request;
396   MPI_Status status;
397
398   xbt_assert(xbt_dynar_length(get_reqq_self()),
399       "action wait not preceded by any irecv or isend: %s",
400       xbt_str_join_array(action," "));
401   request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
402
403   if (!request){
404     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
405     return;
406   }
407
408   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
409
410   MPI_Group group = smpi_comm_group(request->comm);
411   int src_traced = smpi_group_rank(group, request->src);
412   int dst_traced = smpi_group_rank(group, request->dst);
413   int is_wait_for_receive = request->recv;
414   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415   extra->type = TRACING_WAIT;
416   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
417
418   smpi_mpi_wait(&request, &status);
419
420   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421   if (is_wait_for_receive)
422     TRACE_smpi_recv(rank, src_traced, dst_traced);
423   log_timed_action (action, clock);
424 }
425
426 static void action_waitall(const char *const *action){
427   CHECK_ACTION_PARAMS(action, 0, 0);
428   double clock = smpi_process_simulated_elapsed();
429   int count_requests=0;
430   unsigned int i=0;
431
432   count_requests=xbt_dynar_length(get_reqq_self());
433
434   if (count_requests>0) {
435     MPI_Request requests[count_requests];
436     MPI_Status status[count_requests];
437
438     /*  The reqq is an array of dynars. Its index corresponds to the rank.
439      Thus each rank saves its own requests to the array request. */
440     xbt_dynar_foreach(get_reqq_self(),i,requests[i]); 
441
442    //save information from requests
443    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
444    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
445    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
446    for (i = 0; (int)i < count_requests; i++) {
447     if(requests[i]){
448       int *asrc = xbt_new(int, 1);
449       int *adst = xbt_new(int, 1);
450       int *arecv = xbt_new(int, 1);
451       *asrc = requests[i]->src;
452       *adst = requests[i]->dst;
453       *arecv = requests[i]->recv;
454       xbt_dynar_insert_at(srcs, i, asrc);
455       xbt_dynar_insert_at(dsts, i, adst);
456       xbt_dynar_insert_at(recvs, i, arecv);
457       xbt_free(asrc);
458       xbt_free(adst);
459       xbt_free(arecv);
460     }else {
461       int *t = xbt_new(int, 1);
462       xbt_dynar_insert_at(srcs, i, t);
463       xbt_dynar_insert_at(dsts, i, t);
464       xbt_dynar_insert_at(recvs, i, t);
465       xbt_free(t);
466     }
467    }
468    int rank_traced = smpi_process_index();
469    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
470    extra->type = TRACING_WAITALL;
471    extra->send_size=count_requests;
472    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
473
474    smpi_mpi_waitall(count_requests, requests, status);
475
476    for (i = 0; (int)i < count_requests; i++) {
477     int src_traced, dst_traced, is_wait_for_receive;
478     xbt_dynar_get_cpy(srcs, i, &src_traced);
479     xbt_dynar_get_cpy(dsts, i, &dst_traced);
480     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
481     if (is_wait_for_receive) {
482       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
483     }
484    }
485    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
486    //clean-up of dynars
487    xbt_dynar_free(&srcs);
488    xbt_dynar_free(&dsts);
489    xbt_dynar_free(&recvs);
490
491    //TODO xbt_dynar_free_container(get_reqq_self());
492    set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
493   }
494   log_timed_action (action, clock);
495 }
496
497 static void action_barrier(const char *const *action){
498   double clock = smpi_process_simulated_elapsed();
499   int rank = smpi_process_index();
500   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
501   extra->type = TRACING_BARRIER;
502   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
503
504   mpi_coll_barrier_fun(MPI_COMM_WORLD);
505
506   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
507   log_timed_action (action, clock);
508 }
509
510 static void action_bcast(const char *const *action)
511 {
512   CHECK_ACTION_PARAMS(action, 1, 2);
513   double size = parse_double(action[2]);
514   double clock = smpi_process_simulated_elapsed();
515   int root=0;
516   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
517   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
518
519   if(action[3]) {
520     root= atoi(action[3]);
521     if(action[4]) {
522       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
523     }
524   }
525
526   int rank = smpi_process_index();
527   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
528
529   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
530   extra->type = TRACING_BCAST;
531   extra->send_size = size;
532   extra->root = root_traced;
533   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
534   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
535   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
536
537   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
538
539   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
540   log_timed_action (action, clock);
541 }
542
543 static void action_reduce(const char *const *action)
544 {
545   CHECK_ACTION_PARAMS(action, 2, 2);
546   double comm_size = parse_double(action[2]);
547   double comp_size = parse_double(action[3]);
548   double clock = smpi_process_simulated_elapsed();
549   int root=0;
550   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
551
552   if(action[4]) {
553     root= atoi(action[4]);
554     if(action[5]) {
555       MPI_CURRENT_TYPE=decode_datatype(action[5]);
556     }
557   }
558
559   int rank = smpi_process_index();
560   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
561   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
562   extra->type = TRACING_REDUCE;
563   extra->send_size = comm_size;
564   extra->comp_size = comp_size;
565   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
566   extra->root = root_traced;
567
568   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
569
570   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
571   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
572   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
573   smpi_execute_flops(comp_size);
574
575   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
576   log_timed_action (action, clock);
577 }
578
579 static void action_allReduce(const char *const *action) {
580   CHECK_ACTION_PARAMS(action, 2, 1);
581   double comm_size = parse_double(action[2]);
582   double comp_size = parse_double(action[3]);
583
584   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
585   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
586
587   double clock = smpi_process_simulated_elapsed();
588   int rank = smpi_process_index();
589   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
590   extra->type = TRACING_ALLREDUCE;
591   extra->send_size = comm_size;
592   extra->comp_size = comp_size;
593   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
594   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
595
596   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
597   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
598   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
599   smpi_execute_flops(comp_size);
600
601   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
602   log_timed_action (action, clock);
603 }
604
605 static void action_allToAll(const char *const *action) {
606   CHECK_ACTION_PARAMS(action, 2, 2); //two mandatory (send and recv volumes)
607                                      //two optional (corresponding datatypes)
608   double clock = smpi_process_simulated_elapsed();
609   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
610   int send_size = parse_double(action[2]);
611   int recv_size = parse_double(action[3]);
612   MPI_Datatype MPI_CURRENT_TYPE2;
613
614   if(action[4] && action[5]) {
615     MPI_CURRENT_TYPE=decode_datatype(action[4]);
616     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
617   }
618   else{
619     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
621   }
622
623   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
625
626   int rank = smpi_process_index();
627   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
628   extra->type = TRACING_ALLTOALL;
629   extra->send_size = send_size;
630   extra->recv_size = recv_size;
631   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
632   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
633
634   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
635
636   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
637
638   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
639   log_timed_action (action, clock);
640 }
641
642 static void action_gather(const char *const *action) {
643   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
644  0 gather 68 68 0 0 0
645
646   where: 
647   1) 68 is the sendcounts
648   2) 68 is the recvcounts
649   3) 0 is the root node
650   4) 0 is the send datatype id, see decode_datatype()
651   5) 0 is the recv datatype id, see decode_datatype()
652   */
653   CHECK_ACTION_PARAMS(action, 2, 3);
654   double clock = smpi_process_simulated_elapsed();
655   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
656   int send_size = parse_double(action[2]);
657   int recv_size = parse_double(action[3]);
658   MPI_Datatype MPI_CURRENT_TYPE2;
659   if(action[4] && action[5]) {
660     MPI_CURRENT_TYPE=decode_datatype(action[5]);
661     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
662   } else {
663     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
664     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
665   }
666   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
667   void *recv = NULL;
668   int root=0;
669   if(action[4])
670     root=atoi(action[4]);
671   int rank = smpi_comm_rank(MPI_COMM_WORLD);
672
673   if(rank==root)
674     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
675
676   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
677   extra->type = TRACING_GATHER;
678   extra->send_size = send_size;
679   extra->recv_size = recv_size;
680   extra->root = root;
681   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
682   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
683
684   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
685
686   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
687
688   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
689   log_timed_action (action, clock);
690 }
691
692 static void action_gatherv(const char *const *action) {
693   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
694  0 gather 68 68 10 10 10 0 0 0
695
696   where:
697   1) 68 is the sendcount
698   2) 68 10 10 10 is the recvcounts
699   3) 0 is the root node
700   4) 0 is the send datatype id, see decode_datatype()
701   5) 0 is the recv datatype id, see decode_datatype()
702   */
703
704   double clock = smpi_process_simulated_elapsed();
705   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
706   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
707   int send_size = parse_double(action[2]);
708   int *disps = xbt_new0(int, comm_size);
709   int *recvcounts = xbt_new0(int, comm_size);
710   int i=0,recv_sum=0;
711
712   MPI_Datatype MPI_CURRENT_TYPE2;
713   if(action[4+comm_size] && action[5+comm_size]) {
714     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
715     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
716   } else {
717     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
718     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
719   }
720   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
721   void *recv = NULL;
722   for(i=0;i<comm_size;i++) {
723     recvcounts[i] = atoi(action[i+3]);
724     recv_sum=recv_sum+recvcounts[i];
725     disps[i] = 0;
726   }
727
728   int root=atoi(action[3+comm_size]);
729   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
730
731   if(rank==root)
732     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
733
734   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
735   extra->type = TRACING_GATHERV;
736   extra->send_size = send_size;
737   extra->recvcounts= xbt_new(int,comm_size);
738   for(i=0; i< comm_size; i++)//copy data to avoid bad free
739     extra->recvcounts[i] = recvcounts[i];
740   extra->root = root;
741   extra->num_processes = comm_size;
742   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
743   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
744
745   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
746
747   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
748
749   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
750   log_timed_action (action, clock);
751   xbt_free(recvcounts);
752   xbt_free(disps);
753 }
754
755 static void action_reducescatter(const char *const *action) {
756  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
757 0 reduceScatter 275427 275427 275427 204020 11346849 0
758
759   where: 
760   1) The first four values after the name of the action declare the recvcounts array
761   2) The value 11346849 is the amount of instructions
762   3) The last value corresponds to the datatype, see decode_datatype().
763
764   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
765   double clock = smpi_process_simulated_elapsed();
766   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
767   CHECK_ACTION_PARAMS(action, comm_size+1, 1);
768   int comp_size = parse_double(action[2+comm_size]);
769   int *recvcounts = xbt_new0(int, comm_size);  
770   int *disps = xbt_new0(int, comm_size);  
771   int i=0;
772   int rank = smpi_process_index();
773   int size = 0;
774   if(action[3+comm_size])
775     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
776   else
777     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
778
779   for(i=0;i<comm_size;i++) {
780     recvcounts[i] = atoi(action[i+2]);
781     disps[i] = 0;
782     size+=recvcounts[i];
783   }
784
785   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
786   extra->type = TRACING_REDUCE_SCATTER;
787   extra->send_size = 0;
788   extra->recvcounts= xbt_new(int, comm_size);
789   for(i=0; i< comm_size; i++)//copy data to avoid bad free
790     extra->recvcounts[i] = recvcounts[i];
791   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
792   extra->comp_size = comp_size;
793   extra->num_processes = comm_size;
794
795   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
796
797   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
798   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
799    
800    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
801    smpi_execute_flops(comp_size);
802
803   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
804   xbt_free(recvcounts);
805   xbt_free(disps);
806   log_timed_action (action, clock);
807 }
808
809 static void action_allgather(const char *const *action) {
810   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
811   0 allGather 275427 275427
812
813   where: 
814   1) 275427 is the sendcount
815   2) 275427 is the recvcount
816   3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().  */
817   double clock = smpi_process_simulated_elapsed();
818
819   CHECK_ACTION_PARAMS(action, 2, 2);
820   int sendcount=atoi(action[2]); 
821   int recvcount=atoi(action[3]); 
822
823   MPI_Datatype MPI_CURRENT_TYPE2;
824
825   if(action[4] && action[5]) {
826     MPI_CURRENT_TYPE = decode_datatype(action[4]);
827     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
828   } else {
829     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
830     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
831   }
832   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
833   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
834
835   int rank = smpi_process_index();
836   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
837   extra->type = TRACING_ALLGATHER;
838   extra->send_size = sendcount;
839   extra->recv_size= recvcount;
840   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
841   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
842   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
843
844   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
845
846   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
847
848   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
849   log_timed_action (action, clock);
850 }
851
852 static void action_allgatherv(const char *const *action) {
853   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
854 0 allGatherV 275427 275427 275427 275427 204020
855
856   where: 
857   1) 275427 is the sendcount
858   2) The next four elements declare the recvcounts array
859   3) No more values mean that the datatype for sent and receive buffer
860   is the default one, see decode_datatype(). */
861   double clock = smpi_process_simulated_elapsed();
862
863   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
864   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
865   int i=0;
866   int sendcount=atoi(action[2]);
867   int *recvcounts = xbt_new0(int, comm_size);  
868   int *disps = xbt_new0(int, comm_size);  
869   int recv_sum=0;  
870   MPI_Datatype MPI_CURRENT_TYPE2;
871
872   if(action[3+comm_size] && action[4+comm_size]) {
873     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
874     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
875   } else {
876     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
877     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
878   }
879   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
880
881   for(i=0;i<comm_size;i++) {
882     recvcounts[i] = atoi(action[i+3]);
883     recv_sum=recv_sum+recvcounts[i];
884   }
885   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
886
887   int rank = smpi_process_index();
888   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
889   extra->type = TRACING_ALLGATHERV;
890   extra->send_size = sendcount;
891   extra->recvcounts= xbt_new(int, comm_size);
892   for(i=0; i< comm_size; i++)//copy data to avoid bad free
893     extra->recvcounts[i] = recvcounts[i];
894   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
895   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
896   extra->num_processes = comm_size;
897
898   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
899
900   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
901                           MPI_COMM_WORLD);
902
903   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
904   log_timed_action (action, clock);
905   xbt_free(recvcounts);
906   xbt_free(disps);
907 }
908
909 static void action_allToAllv(const char *const *action) {
910   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
911   0 allToAllV 100 1 7 10 12 100 1 70 10 5
912
913   where: 
914   1) 100 is the size of the send buffer *sizeof(int),
915   2) 1 7 10 12 is the sendcounts array
916   3) 100*sizeof(int) is the size of the receiver buffer
917   4)  1 70 10 5 is the recvcounts array */
918   double clock = smpi_process_simulated_elapsed();
919
920   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
921   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
922   int send_buf_size=0,recv_buf_size=0,i=0;
923   int *sendcounts = xbt_new0(int, comm_size);  
924   int *recvcounts = xbt_new0(int, comm_size);  
925   int *senddisps = xbt_new0(int, comm_size);  
926   int *recvdisps = xbt_new0(int, comm_size);  
927
928   MPI_Datatype MPI_CURRENT_TYPE2;
929
930   send_buf_size=parse_double(action[2]);
931   recv_buf_size=parse_double(action[3+comm_size]);
932   if(action[4+2*comm_size] && action[5+2*comm_size]) {
933     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
934     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
935   }
936   else{
937     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
938     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
939   }
940
941   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
942   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
943
944   for(i=0;i<comm_size;i++) {
945     sendcounts[i] = atoi(action[i+3]);
946     recvcounts[i] = atoi(action[i+4+comm_size]);
947   }
948
949   int rank = smpi_process_index();
950   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
951   extra->type = TRACING_ALLTOALLV;
952   extra->recvcounts= xbt_new(int, comm_size);
953   extra->sendcounts= xbt_new(int, comm_size);
954   extra->num_processes = comm_size;
955
956   for(i=0; i< comm_size; i++){//copy data to avoid bad free
957     extra->send_size += sendcounts[i];
958     extra->sendcounts[i] = sendcounts[i];
959     extra->recv_size += recvcounts[i];
960     extra->recvcounts[i] = recvcounts[i];
961   }
962   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
963   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
964
965   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
966
967   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
968                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
969
970   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
971   log_timed_action (action, clock);
972   xbt_free(sendcounts);
973   xbt_free(recvcounts);
974   xbt_free(senddisps);
975   xbt_free(recvdisps);
976 }
977
978 void smpi_replay_run(int *argc, char***argv){
979   /* First initializes everything */
980   smpi_process_init(argc, argv);
981   smpi_process_mark_as_initialized();
982   smpi_process_set_replaying(1);
983
984   int rank = smpi_process_index();
985   TRACE_smpi_init(rank);
986   TRACE_smpi_computing_init(rank);
987   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
988   extra->type = TRACING_INIT;
989   char *operation =bprintf("%s_init",__FUNCTION__);
990   TRACE_smpi_collective_in(rank, -1, operation, extra);
991   TRACE_smpi_collective_out(rank, -1, operation);
992   free(operation);
993
994   if (!_xbt_replay_action_init()) {
995     xbt_replay_action_register("init",       action_init);
996     xbt_replay_action_register("finalize",   action_finalize);
997     xbt_replay_action_register("comm_size",  action_comm_size);
998     xbt_replay_action_register("comm_split", action_comm_split);
999     xbt_replay_action_register("comm_dup",   action_comm_dup);
1000     xbt_replay_action_register("send",       action_send);
1001     xbt_replay_action_register("Isend",      action_Isend);
1002     xbt_replay_action_register("recv",       action_recv);
1003     xbt_replay_action_register("Irecv",      action_Irecv);
1004     xbt_replay_action_register("test",       action_test);
1005     xbt_replay_action_register("wait",       action_wait);
1006     xbt_replay_action_register("waitAll",    action_waitall);
1007     xbt_replay_action_register("barrier",    action_barrier);
1008     xbt_replay_action_register("bcast",      action_bcast);
1009     xbt_replay_action_register("reduce",     action_reduce);
1010     xbt_replay_action_register("allReduce",  action_allReduce);
1011     xbt_replay_action_register("allToAll",   action_allToAll);
1012     xbt_replay_action_register("allToAllV",  action_allToAllv);
1013     xbt_replay_action_register("gather",  action_gather);
1014     xbt_replay_action_register("gatherV",  action_gatherv);
1015     xbt_replay_action_register("allGather",  action_allgather);
1016     xbt_replay_action_register("allGatherV",  action_allgatherv);
1017     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1018     xbt_replay_action_register("compute",    action_compute);
1019   }
1020
1021   //if we have a delayed start, sleep here.
1022   if(*argc>2){
1023     char *endptr;
1024     double value = strtod((*argv)[2], &endptr);
1025     if (*endptr != '\0')
1026       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1027     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1028     smpi_execute_flops(value);
1029   } else {
1030     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1031     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
1032     smpi_execute_flops(0.0);
1033   }
1034
1035   /* Actually run the replay */
1036   xbt_replay_action_runner(*argc, *argv);
1037
1038   /* and now, finalize everything */
1039   double sim_time= 1.;
1040   /* One active process will stop. Decrease the counter*/
1041   XBT_DEBUG("There are %lu elements in reqq[*]", xbt_dynar_length(get_reqq_self()));
1042   if (!xbt_dynar_is_empty(get_reqq_self())){
1043     int count_requests=xbt_dynar_length(get_reqq_self());
1044     MPI_Request requests[count_requests];
1045     MPI_Status status[count_requests];
1046     unsigned int i;
1047
1048     xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1049     smpi_mpi_waitall(count_requests, requests, status);
1050     active_processes--;
1051   } else {
1052     active_processes--;
1053   }
1054
1055   if(!active_processes){
1056     /* Last process alive speaking */
1057     /* end the simulated timer */
1058     sim_time = smpi_process_simulated_elapsed();
1059   }
1060
1061   //TODO xbt_dynar_free_container(get_reqq_self()));
1062
1063   if(!active_processes){
1064     XBT_INFO("Simulation time %f", sim_time);
1065     _xbt_replay_action_exit();
1066     xbt_free(sendbuffer);
1067     xbt_free(recvbuffer);
1068     //xbt_free(reqq);
1069     xbt_dict_free(&reqq); //not need, data have been freed ???
1070     reqq = NULL;
1071   }
1072
1073   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1074   extra_fin->type = TRACING_FINALIZE;
1075   operation =bprintf("%s_finalize",__FUNCTION__);
1076   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1077
1078   smpi_process_finalize();
1079
1080   TRACE_smpi_collective_out(rank, -1, operation);
1081   TRACE_smpi_finalize(smpi_process_index());
1082   smpi_process_destroy();
1083   free(operation);
1084 }