Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[mc] Replace the override word with MC_OVERRIDE
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype, int* known)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if(known)*known=1;
112   if (datatype==MPI_BYTE){
113       return "";
114   }
115   if(datatype==MPI_DOUBLE)
116       return "0";
117   if(datatype==MPI_INT)
118       return "1";
119   if(datatype==MPI_CHAR)
120       return "2";
121   if(datatype==MPI_SHORT)
122       return "3";
123   if(datatype==MPI_LONG)
124     return "4";
125   if(datatype==MPI_FLOAT)
126       return "5";
127   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
128   if(known)*known=0;
129   // default - not implemented.
130   // do not warn here as we pass in this function even for other trace formats
131   return "-1";
132 }
133
134 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
135     int i=0;\
136     while(action[i]!=NULL)\
137      i++;\
138     if(i<mandatory+2)                                           \
139     THROWF(arg_error, 0, "%s replay failed.\n" \
140           "%d items were given on the line. First two should be process_id and action.  " \
141           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
142           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
143   }
144
145
146 static void action_init(const char *const *action)
147 {
148   int i;
149   XBT_DEBUG("Initialize the counters");
150   CHECK_ACTION_PARAMS(action, 0, 1);
151   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
152   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
153
154   /* start a simulated timer */
155   smpi_process_simulated_start();
156   /*initialize the number of active processes */
157   active_processes = smpi_process_count();
158
159   if (!reqq) {
160     reqq=xbt_new0(xbt_dynar_t,active_processes);
161
162     for(i=0;i<active_processes;i++){
163       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
164     }
165   }
166 }
167
168 static void action_finalize(const char *const *action)
169 {
170 }
171
172 static void action_comm_size(const char *const *action)
173 {
174   double clock = smpi_process_simulated_elapsed();
175
176   communicator_size = parse_double(action[2]);
177   log_timed_action (action, clock);
178 }
179
180 static void action_comm_split(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183
184   log_timed_action (action, clock);
185 }
186
187 static void action_comm_dup(const char *const *action)
188 {
189   double clock = smpi_process_simulated_elapsed();
190
191   log_timed_action (action, clock);
192 }
193
194 static void action_compute(const char *const *action)
195 {
196   CHECK_ACTION_PARAMS(action, 1, 0);
197   double clock = smpi_process_simulated_elapsed();
198   double flops= parse_double(action[2]);
199   int rank = smpi_process_index();
200   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
201   extra->type=TRACING_COMPUTING;
202   extra->comp_size=flops;
203   TRACE_smpi_computing_in(rank, extra);
204
205   smpi_execute_flops(flops);
206
207   TRACE_smpi_computing_out(rank);
208   log_timed_action (action, clock);
209 }
210
211 static void action_send(const char *const *action)
212 {
213   CHECK_ACTION_PARAMS(action, 2, 1);
214   int to = atoi(action[2]);
215   double size=parse_double(action[3]);
216   double clock = smpi_process_simulated_elapsed();
217
218   if(action[4]) {
219     MPI_CURRENT_TYPE=decode_datatype(action[4]);
220   } else {
221     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
222   }
223
224   int rank = smpi_process_index();
225
226   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
227   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
228   extra->type = TRACING_SEND;
229   extra->send_size = size;
230   extra->src = rank;
231   extra->dst = dst_traced;
232   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
233   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
234   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
235
236   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
237
238   log_timed_action (action, clock);
239
240   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
241 }
242
243 static void action_Isend(const char *const *action)
244 {
245   CHECK_ACTION_PARAMS(action, 2, 1);
246   int to = atoi(action[2]);
247   double size=parse_double(action[3]);
248   double clock = smpi_process_simulated_elapsed();
249   MPI_Request request;
250
251   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
252   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
253
254   int rank = smpi_process_index();
255   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
256   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
257   extra->type = TRACING_ISEND;
258   extra->send_size = size;
259   extra->src = rank;
260   extra->dst = dst_traced;
261   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
262   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
263   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
264
265   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
266
267   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
268   request->send = 1;
269
270   xbt_dynar_push(reqq[smpi_process_index()],&request);
271
272   log_timed_action (action, clock);
273 }
274
275 static void action_recv(const char *const *action) {
276   CHECK_ACTION_PARAMS(action, 2, 1);
277   int from = atoi(action[2]);
278   double size=parse_double(action[3]);
279   double clock = smpi_process_simulated_elapsed();
280   MPI_Status status;
281
282   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
283   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
284
285   int rank = smpi_process_index();
286   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
287
288   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
289   extra->type = TRACING_RECV;
290   extra->send_size = size;
291   extra->src = src_traced;
292   extra->dst = rank;
293   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
294   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
295
296   //unknow size from the receiver pov
297   if(size==-1){
298       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
299       size=status.count;
300   }
301
302   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
303
304   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
305   TRACE_smpi_recv(rank, src_traced, rank);
306
307   log_timed_action (action, clock);
308 }
309
310 static void action_Irecv(const char *const *action)
311 {
312   CHECK_ACTION_PARAMS(action, 2, 1);
313   int from = atoi(action[2]);
314   double size=parse_double(action[3]);
315   double clock = smpi_process_simulated_elapsed();
316   MPI_Request request;
317
318   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
319   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
320
321   int rank = smpi_process_index();
322   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
323   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
324   extra->type = TRACING_IRECV;
325   extra->send_size = size;
326   extra->src = src_traced;
327   extra->dst = rank;
328   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
329   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
330   MPI_Status status;
331   //unknow size from the receiver pov
332   if(size==-1){
333       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
334       size=status.count;
335   }
336
337   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
338
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341   xbt_dynar_push(reqq[smpi_process_index()],&request);
342
343   log_timed_action (action, clock);
344 }
345
346 static void action_test(const char *const *action){
347   CHECK_ACTION_PARAMS(action, 0, 0);
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   //if request is null here, this may mean that a previous test has succeeded 
355   //Different times in traced application and replayed version may lead to this 
356   //In this case, ignore the extra calls.
357   if(request){
358           int rank = smpi_process_index();
359           instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
360           extra->type=TRACING_TEST;
361           TRACE_smpi_testing_in(rank, extra);
362
363           flag = smpi_mpi_test(&request, &status);
364
365           XBT_DEBUG("MPI_Test result: %d", flag);
366           /* push back request in dynar to be caught by a subsequent wait. if the test
367            * did succeed, the request is now NULL.
368            */
369           xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
370
371           TRACE_smpi_testing_out(rank);
372   }
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   CHECK_ACTION_PARAMS(action, 0, 0);
378   double clock = smpi_process_simulated_elapsed();
379   MPI_Request request;
380   MPI_Status status;
381
382   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
383       "action wait not preceded by any irecv or isend: %s",
384       xbt_str_join_array(action," "));
385   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
386
387   if (!request){
388     /* Assuming that the trace is well formed, this mean the comm might have
389      * been caught by a MPI_test. Then just return.
390      */
391     return;
392   }
393
394   int rank = request->comm != MPI_COMM_NULL
395       ? smpi_comm_rank(request->comm)
396       : -1;
397
398   MPI_Group group = smpi_comm_group(request->comm);
399   int src_traced = smpi_group_rank(group, request->src);
400   int dst_traced = smpi_group_rank(group, request->dst);
401   int is_wait_for_receive = request->recv;
402   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403   extra->type = TRACING_WAIT;
404   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
405
406   smpi_mpi_wait(&request, &status);
407
408   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409   if (is_wait_for_receive)
410     TRACE_smpi_recv(rank, src_traced, dst_traced);
411   log_timed_action (action, clock);
412 }
413
414 static void action_waitall(const char *const *action){
415   CHECK_ACTION_PARAMS(action, 0, 0);
416   double clock = smpi_process_simulated_elapsed();
417   int count_requests=0;
418   unsigned int i=0;
419
420   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
421
422   if (count_requests>0) {
423     MPI_Request requests[count_requests];
424     MPI_Status status[count_requests];
425
426     /*  The reqq is an array of dynars. Its index corresponds to the rank.
427      Thus each rank saves its own requests to the array request. */
428     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
429
430    //save information from requests
431
432    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
433    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
434    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
435    for (i = 0; i < count_requests; i++) {
436     if(requests[i]){
437       int *asrc = xbt_new(int, 1);
438       int *adst = xbt_new(int, 1);
439       int *arecv = xbt_new(int, 1);
440       *asrc = requests[i]->src;
441       *adst = requests[i]->dst;
442       *arecv = requests[i]->recv;
443       xbt_dynar_insert_at(srcs, i, asrc);
444       xbt_dynar_insert_at(dsts, i, adst);
445       xbt_dynar_insert_at(recvs, i, arecv);
446       xbt_free(asrc);
447       xbt_free(adst);
448       xbt_free(arecv);
449     }else {
450       int *t = xbt_new(int, 1);
451       xbt_dynar_insert_at(srcs, i, t);
452       xbt_dynar_insert_at(dsts, i, t);
453       xbt_dynar_insert_at(recvs, i, t);
454       xbt_free(t);
455     }
456    }
457    int rank_traced = smpi_process_index();
458    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
459    extra->type = TRACING_WAITALL;
460    extra->send_size=count_requests;
461    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
462
463    smpi_mpi_waitall(count_requests, requests, status);
464
465    for (i = 0; i < count_requests; i++) {
466     int src_traced, dst_traced, is_wait_for_receive;
467     xbt_dynar_get_cpy(srcs, i, &src_traced);
468     xbt_dynar_get_cpy(dsts, i, &dst_traced);
469     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
470     if (is_wait_for_receive) {
471       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
472     }
473    }
474    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
475    //clean-up of dynars
476    xbt_dynar_free(&srcs);
477    xbt_dynar_free(&dsts);
478    xbt_dynar_free(&recvs);
479
480    int freedrank=smpi_process_index();
481    xbt_dynar_free_container(&(reqq[freedrank]));
482    reqq[freedrank]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
483   }
484   log_timed_action (action, clock);
485 }
486
487 static void action_barrier(const char *const *action){
488   double clock = smpi_process_simulated_elapsed();
489   int rank = smpi_process_index();
490   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
491   extra->type = TRACING_BARRIER;
492   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
493
494   mpi_coll_barrier_fun(MPI_COMM_WORLD);
495
496   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
497   log_timed_action (action, clock);
498 }
499
500
501 static void action_bcast(const char *const *action)
502 {
503   CHECK_ACTION_PARAMS(action, 1, 2);
504   double size = parse_double(action[2]);
505   double clock = smpi_process_simulated_elapsed();
506   int root=0;
507   /*
508    * Initialize MPI_CURRENT_TYPE in order to decrease
509    * the number of the checks
510    * */
511   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
512
513   if(action[3]) {
514     root= atoi(action[3]);
515     if(action[4]) {
516       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
517     }
518   }
519
520   int rank = smpi_process_index();
521   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
522
523   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
524   extra->type = TRACING_BCAST;
525   extra->send_size = size;
526   extra->root = root_traced;
527   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
528   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
529   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
530
531   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
532
533   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
534   log_timed_action (action, clock);
535 }
536
537 static void action_reduce(const char *const *action)
538 {
539   CHECK_ACTION_PARAMS(action, 2, 2);
540   double comm_size = parse_double(action[2]);
541   double comp_size = parse_double(action[3]);
542   double clock = smpi_process_simulated_elapsed();
543   int root=0;
544   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
545
546   if(action[4]) {
547     root= atoi(action[4]);
548     if(action[5]) {
549       MPI_CURRENT_TYPE=decode_datatype(action[5]);
550     }
551   }
552   
553   
554
555   int rank = smpi_process_index();
556   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
557   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
558   extra->type = TRACING_REDUCE;
559   extra->send_size = comm_size;
560   extra->comp_size = comp_size;
561   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
562   extra->root = root_traced;
563
564   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
565
566   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
567   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
568    mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
569    smpi_execute_flops(comp_size);
570
571   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
572   log_timed_action (action, clock);
573 }
574
575 static void action_allReduce(const char *const *action) {
576   CHECK_ACTION_PARAMS(action, 2, 1);
577   double comm_size = parse_double(action[2]);
578   double comp_size = parse_double(action[3]);
579
580   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
581   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
582
583   double clock = smpi_process_simulated_elapsed();
584   int rank = smpi_process_index();
585   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
586   extra->type = TRACING_ALLREDUCE;
587   extra->send_size = comm_size;
588   extra->comp_size = comp_size;
589   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
590   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
591
592   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
593   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
594   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
595   smpi_execute_flops(comp_size);
596
597   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
598   log_timed_action (action, clock);
599 }
600
601 static void action_allToAll(const char *const *action) {
602   double clock = smpi_process_simulated_elapsed();
603   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604   int send_size = parse_double(action[2]);
605   int recv_size = parse_double(action[3]);
606   MPI_Datatype MPI_CURRENT_TYPE2;
607
608   if(action[4]) {
609     MPI_CURRENT_TYPE=decode_datatype(action[4]);
610     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
611   }
612   else {
613     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
615   }
616   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
617   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
618
619   int rank = smpi_process_index();
620   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
621   extra->type = TRACING_ALLTOALL;
622   extra->send_size = send_size;
623   extra->recv_size = recv_size;
624   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
625   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
626
627   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
628
629   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
630
631   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
632   log_timed_action (action, clock);
633 }
634
635
636 static void action_gather(const char *const *action) {
637   /*
638  The structure of the gather action for the rank 0 (total 4 processes) 
639  is the following:   
640  0 gather 68 68 0 0 0
641
642   where: 
643   1) 68 is the sendcounts
644   2) 68 is the recvcounts
645   3) 0 is the root node
646   4) 0 is the send datatype id, see decode_datatype()
647   5) 0 is the recv datatype id, see decode_datatype()
648   */
649   CHECK_ACTION_PARAMS(action, 2, 3);
650   double clock = smpi_process_simulated_elapsed();
651   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
652   int send_size = parse_double(action[2]);
653   int recv_size = parse_double(action[3]);
654   MPI_Datatype MPI_CURRENT_TYPE2;
655   if(action[5]) {
656     MPI_CURRENT_TYPE=decode_datatype(action[5]);
657     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
658   } else {
659     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
660     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
661   }
662   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
663   void *recv = NULL;
664   int root=0;
665   if(action[4])
666     root=atoi(action[4]);
667   int rank = smpi_comm_rank(MPI_COMM_WORLD);
668
669   if(rank==root)
670     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
671
672   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
673   extra->type = TRACING_GATHER;
674   extra->send_size = send_size;
675   extra->recv_size = recv_size;
676   extra->root = root;
677   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
678   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
679
680   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
681
682   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
683                 recv, recv_size, MPI_CURRENT_TYPE2,
684                 root, MPI_COMM_WORLD);
685
686   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
687   log_timed_action (action, clock);
688 }
689
690
691
692 static void action_gatherv(const char *const *action) {
693   /*
694  The structure of the gatherv action for the rank 0 (total 4 processes)
695  is the following:
696  0 gather 68 68 10 10 10 0 0 0
697
698   where:
699   1) 68 is the sendcount
700   2) 68 10 10 10 is the recvcounts
701   3) 0 is the root node
702   4) 0 is the send datatype id, see decode_datatype()
703   5) 0 is the recv datatype id, see decode_datatype()
704   */
705
706   double clock = smpi_process_simulated_elapsed();
707   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
708   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
709   int send_size = parse_double(action[2]);
710   int *disps = xbt_new0(int, comm_size);
711   int *recvcounts = xbt_new0(int, comm_size);
712   int i=0,recv_sum=0;
713
714   MPI_Datatype MPI_CURRENT_TYPE2;
715   if(action[4+comm_size]) {
716     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
717     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
718   } else {
719     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
720     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
721   }
722   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
723   void *recv = NULL;
724   for(i=0;i<comm_size;i++) {
725     recvcounts[i] = atoi(action[i+3]);
726     recv_sum=recv_sum+recvcounts[i];
727     disps[i] = 0;
728   }
729
730   int root=atoi(action[3+comm_size]);
731   int rank = smpi_comm_rank(MPI_COMM_WORLD);;
732
733   if(rank==root)
734     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
735
736   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
737   extra->type = TRACING_GATHERV;
738   extra->send_size = send_size;
739   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
740   for(i=0; i< comm_size; i++)//copy data to avoid bad free
741     extra->recvcounts[i] = recvcounts[i];
742   extra->root = root;
743   extra->num_processes = comm_size;
744   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
745   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
746
747   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
748
749   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
750                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
751                 root, MPI_COMM_WORLD);
752
753   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
754   log_timed_action (action, clock);
755   xbt_free(recvcounts);
756   xbt_free(disps);
757 }
758
759 static void action_reducescatter(const char *const *action) {
760
761     /*
762  The structure of the reducescatter action for the rank 0 (total 4 processes) 
763  is the following:   
764 0 reduceScatter 275427 275427 275427 204020 11346849 0
765
766   where: 
767   1) The first four values after the name of the action declare the recvcounts array
768   2) The value 11346849 is the amount of instructions
769   3) The last value corresponds to the datatype, see decode_datatype().
770
771   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
772
773    */
774
775   double clock = smpi_process_simulated_elapsed();
776   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
777   CHECK_ACTION_PARAMS(action, comm_size+1, 1);
778   int comp_size = parse_double(action[2+comm_size]);
779   int *recvcounts = xbt_new0(int, comm_size);  
780   int *disps = xbt_new0(int, comm_size);  
781   int i=0;
782   int rank = smpi_process_index();
783   int size = 0;
784   if(action[3+comm_size])
785     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
786   else
787     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
788
789   for(i=0;i<comm_size;i++) {
790     recvcounts[i] = atoi(action[i+2]);
791     disps[i] = 0;
792     size+=recvcounts[i];
793   }
794
795   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
796   extra->type = TRACING_REDUCE_SCATTER;
797   extra->send_size = 0;
798   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
799   for(i=0; i< comm_size; i++)//copy data to avoid bad free
800     extra->recvcounts[i] = recvcounts[i];
801   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
802   extra->comp_size = comp_size;
803   extra->num_processes = comm_size;
804
805   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
806
807   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
808   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
809    
810    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
811        MPI_COMM_WORLD);
812    smpi_execute_flops(comp_size);
813
814
815   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
816   xbt_free(recvcounts);
817   xbt_free(disps);
818   log_timed_action (action, clock);
819 }
820
821 static void action_allgather(const char *const *action) {
822   /*
823  The structure of the allgather action for the rank 0 (total 4 processes) 
824  is the following:   
825   0 allGather 275427 275427
826
827   where: 
828   1) 275427 is the sendcount
829   2) 275427 is the recvcount
830   3) No more values mean that the datatype for sent and receive buffer
831   is the default one, see decode_datatype().
832
833    */
834
835   double clock = smpi_process_simulated_elapsed();
836
837   CHECK_ACTION_PARAMS(action, 2, 2);
838   int sendcount=atoi(action[2]); 
839   int recvcount=atoi(action[3]); 
840
841   MPI_Datatype MPI_CURRENT_TYPE2;
842
843   if(action[4]) {
844     MPI_CURRENT_TYPE = decode_datatype(action[3]);
845     MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
846   } else {
847     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
848     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
849   }
850   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
851   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
852
853   int rank = smpi_process_index();
854   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
855   extra->type = TRACING_ALLGATHER;
856   extra->send_size = sendcount;
857   extra->recv_size= recvcount;
858   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
859   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
860   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
861
862   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
863
864   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
865
866   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
867   log_timed_action (action, clock);
868 }
869
870 static void action_allgatherv(const char *const *action) {
871
872   /*
873  The structure of the allgatherv action for the rank 0 (total 4 processes) 
874  is the following:   
875 0 allGatherV 275427 275427 275427 275427 204020
876
877   where: 
878   1) 275427 is the sendcount
879   2) The next four elements declare the recvcounts array
880   3) No more values mean that the datatype for sent and receive buffer
881   is the default one, see decode_datatype().
882
883    */
884
885   double clock = smpi_process_simulated_elapsed();
886
887   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
888   CHECK_ACTION_PARAMS(action, comm_size+1, 2);
889   int i=0;
890   int sendcount=atoi(action[2]);
891   int *recvcounts = xbt_new0(int, comm_size);  
892   int *disps = xbt_new0(int, comm_size);  
893   int recv_sum=0;  
894   MPI_Datatype MPI_CURRENT_TYPE2;
895
896   if(action[3+comm_size]) {
897     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
898     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
899   } else {
900     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
901     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
902   }
903   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
904
905   for(i=0;i<comm_size;i++) {
906     recvcounts[i] = atoi(action[i+3]);
907     recv_sum=recv_sum+recvcounts[i];
908   }
909   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
910
911   int rank = smpi_process_index();
912   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
913   extra->type = TRACING_ALLGATHERV;
914   extra->send_size = sendcount;
915   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
916   for(i=0; i< comm_size; i++)//copy data to avoid bad free
917     extra->recvcounts[i] = recvcounts[i];
918   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
919   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
920   extra->num_processes = comm_size;
921
922   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
923
924   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
925
926   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
927   log_timed_action (action, clock);
928   xbt_free(recvcounts);
929   xbt_free(disps);
930 }
931
932 static void action_allToAllv(const char *const *action) {
933   /*
934  The structure of the allToAllV action for the rank 0 (total 4 processes) 
935  is the following:   
936   0 allToAllV 100 1 7 10 12 100 1 70 10 5
937
938   where: 
939   1) 100 is the size of the send buffer *sizeof(int),
940   2) 1 7 10 12 is the sendcounts array
941   3) 100*sizeof(int) is the size of the receiver buffer
942   4)  1 70 10 5 is the recvcounts array
943
944    */
945
946
947   double clock = smpi_process_simulated_elapsed();
948
949   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
950   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
951   int send_buf_size=0,recv_buf_size=0,i=0;
952   int *sendcounts = xbt_new0(int, comm_size);  
953   int *recvcounts = xbt_new0(int, comm_size);  
954   int *senddisps = xbt_new0(int, comm_size);  
955   int *recvdisps = xbt_new0(int, comm_size);  
956
957   MPI_Datatype MPI_CURRENT_TYPE2;
958
959   send_buf_size=parse_double(action[2]);
960   recv_buf_size=parse_double(action[3+comm_size]);
961   if(action[4+2*comm_size]) {
962     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
963     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
964   }
965   else {
966       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
967       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
968   }
969
970   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
971   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
972
973   for(i=0;i<comm_size;i++) {
974     sendcounts[i] = atoi(action[i+3]);
975     recvcounts[i] = atoi(action[i+4+comm_size]);
976   }
977
978
979   int rank = smpi_process_index();
980   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
981   extra->type = TRACING_ALLTOALLV;
982   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
983   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
984   extra->num_processes = comm_size;
985
986   for(i=0; i< comm_size; i++){//copy data to avoid bad free
987     extra->send_size += sendcounts[i];
988     extra->sendcounts[i] = sendcounts[i];
989     extra->recv_size += recvcounts[i];
990     extra->recvcounts[i] = recvcounts[i];
991   }
992   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
993   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
994
995   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
996
997   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
998                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
999                                MPI_COMM_WORLD);
1000
1001   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1002   log_timed_action (action, clock);
1003   xbt_free(sendcounts);
1004   xbt_free(recvcounts);
1005   xbt_free(senddisps);
1006   xbt_free(recvdisps);
1007 }
1008
1009 void smpi_replay_init(int *argc, char***argv){
1010   smpi_process_init(argc, argv);
1011   smpi_process_mark_as_initialized();
1012   smpi_process_set_replaying(1);
1013
1014   int rank = smpi_process_index();
1015   TRACE_smpi_init(rank);
1016   TRACE_smpi_computing_init(rank);
1017   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1018   extra->type = TRACING_INIT;
1019   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1020   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1021
1022   if (!smpi_process_index()){
1023     _xbt_replay_action_init();
1024     xbt_replay_action_register("init",       action_init);
1025     xbt_replay_action_register("finalize",   action_finalize);
1026     xbt_replay_action_register("comm_size",  action_comm_size);
1027     xbt_replay_action_register("comm_split", action_comm_split);
1028     xbt_replay_action_register("comm_dup",   action_comm_dup);
1029     xbt_replay_action_register("send",       action_send);
1030     xbt_replay_action_register("Isend",      action_Isend);
1031     xbt_replay_action_register("recv",       action_recv);
1032     xbt_replay_action_register("Irecv",      action_Irecv);
1033     xbt_replay_action_register("test",       action_test);
1034     xbt_replay_action_register("wait",       action_wait);
1035     xbt_replay_action_register("waitAll",    action_waitall);
1036     xbt_replay_action_register("barrier",    action_barrier);
1037     xbt_replay_action_register("bcast",      action_bcast);
1038     xbt_replay_action_register("reduce",     action_reduce);
1039     xbt_replay_action_register("allReduce",  action_allReduce);
1040     xbt_replay_action_register("allToAll",   action_allToAll);
1041     xbt_replay_action_register("allToAllV",  action_allToAllv);
1042     xbt_replay_action_register("gather",  action_gather);
1043     xbt_replay_action_register("gatherV",  action_gatherv);
1044     xbt_replay_action_register("allGather",  action_allgather);
1045     xbt_replay_action_register("allGatherV",  action_allgatherv);
1046     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1047     xbt_replay_action_register("compute",    action_compute);
1048   }
1049   
1050   //if we have a delayed start, sleep here.
1051   if(*argc>2){
1052     char *endptr;
1053     double value = strtod((*argv)[2], &endptr);
1054     if (*endptr != '\0')
1055       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1056     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1057     smpi_execute_flops(value);
1058   }
1059   xbt_replay_action_runner(*argc, *argv);
1060 }
1061
1062 int smpi_replay_finalize(){
1063   double sim_time= 1.;
1064   /* One active process will stop. Decrease the counter*/
1065   XBT_DEBUG("There are %lu elements in reqq[*]",
1066             xbt_dynar_length(reqq[smpi_process_index()]));
1067   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1068     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1069     MPI_Request requests[count_requests];
1070     MPI_Status status[count_requests];
1071     unsigned int i;
1072
1073     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1074     smpi_mpi_waitall(count_requests, requests, status);
1075     active_processes--;
1076   } else {
1077     active_processes--;
1078   }
1079
1080   if(!active_processes){
1081     /* Last process alive speaking */
1082     /* end the simulated timer */
1083     sim_time = smpi_process_simulated_elapsed();
1084   }
1085   
1086
1087   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1088
1089   if(!active_processes){
1090     XBT_INFO("Simulation time %f", sim_time);
1091     _xbt_replay_action_exit();
1092     xbt_free(sendbuffer);
1093     xbt_free(recvbuffer);
1094     xbt_free(reqq);
1095     reqq = NULL;
1096   }
1097   
1098
1099   int rank = smpi_process_index();
1100   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1101   extra->type = TRACING_FINALIZE;
1102   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1103
1104   smpi_process_finalize();
1105
1106   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1107   TRACE_smpi_finalize(smpi_process_index());
1108   smpi_process_destroy();
1109   return MPI_SUCCESS;
1110 }