Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a first draft for automatic privatization of global variables for smpi.
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static void log_timed_action (const char *const *action, double clock){
22   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23     char *name = xbt_str_join_array(action, " ");
24     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
25     free(name);
26   }
27 }
28
29 /* Helper function */
30 static double parse_double(const char *string)
31 {
32   double value;
33   char *endptr;
34   value = strtod(string, &endptr);
35   if (*endptr != '\0')
36     THROWF(unknown_error, 0, "%s is not a double", string);
37   return value;
38 }
39
40 static MPI_Datatype decode_datatype(const char *const action)
41 {
42 // Declared datatypes,
43
44   switch(atoi(action))
45   {
46     case 0:
47       MPI_CURRENT_TYPE=MPI_DOUBLE;
48       break;
49     case 1:
50       MPI_CURRENT_TYPE=MPI_INT;
51       break;
52     case 2:
53       MPI_CURRENT_TYPE=MPI_CHAR;
54       break;
55     case 3:
56       MPI_CURRENT_TYPE=MPI_SHORT;
57       break;
58     case 4:
59       MPI_CURRENT_TYPE=MPI_LONG;
60       break;
61     case 5:
62       MPI_CURRENT_TYPE=MPI_FLOAT;
63       break;
64     case 6:
65       MPI_CURRENT_TYPE=MPI_BYTE;
66       break;
67     default:
68       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
69
70   }
71    return MPI_CURRENT_TYPE;
72 }
73
74
75 const char* encode_datatype(MPI_Datatype datatype)
76 {
77
78   //default type for output is set to MPI_BYTE
79   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80   if (datatype==MPI_BYTE){
81       return "";
82   }
83   if(datatype==MPI_DOUBLE)
84       return "0";
85   if(datatype==MPI_INT)
86       return "1";
87   if(datatype==MPI_CHAR)
88       return "2";
89   if(datatype==MPI_SHORT)
90       return "3";
91   if(datatype==MPI_LONG)
92     return "4";
93   if(datatype==MPI_FLOAT)
94       return "5";
95
96   // default - not implemented.
97   // do not warn here as we pass in this function even for other trace formats
98   return "-1";
99 }
100
101 static void action_init(const char *const *action)
102 {
103   int i;
104   XBT_DEBUG("Initialize the counters");
105
106   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
107   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
108
109   /* start a simulated timer */
110   smpi_process_simulated_start();
111   /*initialize the number of active processes */
112   active_processes = smpi_process_count();
113
114   if (!reqq) {
115     reqq=xbt_new0(xbt_dynar_t,active_processes);
116
117     for(i=0;i<active_processes;i++){
118       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
119     }
120   }
121 }
122
123 static void action_finalize(const char *const *action)
124 {
125 }
126
127 static void action_comm_size(const char *const *action)
128 {
129   double clock = smpi_process_simulated_elapsed();
130
131   communicator_size = parse_double(action[2]);
132   log_timed_action (action, clock);
133 }
134
135 static void action_comm_split(const char *const *action)
136 {
137   double clock = smpi_process_simulated_elapsed();
138
139   log_timed_action (action, clock);
140 }
141
142 static void action_comm_dup(const char *const *action)
143 {
144   double clock = smpi_process_simulated_elapsed();
145
146   log_timed_action (action, clock);
147 }
148
149 static void action_compute(const char *const *action)
150 {
151   double clock = smpi_process_simulated_elapsed();
152   double flops= parse_double(action[2]);
153 #ifdef HAVE_TRACING
154   int rank = smpi_comm_rank(MPI_COMM_WORLD);
155   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156   extra->type=TRACING_COMPUTING;
157   extra->comp_size=flops;
158   TRACE_smpi_computing_in(rank, extra);
159 #endif
160   smpi_execute_flops(flops);
161 #ifdef HAVE_TRACING
162   TRACE_smpi_computing_out(rank);
163 #endif
164
165   log_timed_action (action, clock);
166 }
167
168 static void action_send(const char *const *action)
169 {
170   int to = atoi(action[2]);
171   double size=parse_double(action[3]);
172   double clock = smpi_process_simulated_elapsed();
173
174   if(action[4]) {
175     MPI_CURRENT_TYPE=decode_datatype(action[4]);
176   } else {
177     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
178   }
179
180 #ifdef HAVE_TRACING
181   int rank = smpi_comm_rank(MPI_COMM_WORLD);
182
183   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185   extra->type = TRACING_SEND;
186   extra->send_size = size;
187   extra->src = rank;
188   extra->dst = dst_traced;
189   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
192 #endif
193
194   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195
196   log_timed_action (action, clock);
197
198   #ifdef HAVE_TRACING
199   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
200 #endif
201
202 }
203
204 static void action_Isend(const char *const *action)
205 {
206   int to = atoi(action[2]);
207   double size=parse_double(action[3]);
208   double clock = smpi_process_simulated_elapsed();
209   MPI_Request request;
210
211   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
213
214 #ifdef HAVE_TRACING
215   int rank = smpi_comm_rank(MPI_COMM_WORLD);
216   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218   extra->type = TRACING_ISEND;
219   extra->send_size = size;
220   extra->src = rank;
221   extra->dst = dst_traced;
222   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231   request->send = 1;
232 #endif
233
234   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
235
236   log_timed_action (action, clock);
237 }
238
239 static void action_recv(const char *const *action) {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Status status;
244
245   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
247
248 #ifdef HAVE_TRACING
249   int rank = smpi_comm_rank(MPI_COMM_WORLD);
250   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
251
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_RECV;
254   extra->send_size = size;
255   extra->src = src_traced;
256   extra->dst = rank;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
259 #endif
260
261   //unknow size from the receiver pov
262   if(size==-1){
263       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
264       size=status.count;
265   }
266
267   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
268
269 #ifdef HAVE_TRACING
270   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271   TRACE_smpi_recv(rank, src_traced, rank);
272 #endif
273
274   log_timed_action (action, clock);
275 }
276
277 static void action_Irecv(const char *const *action)
278 {
279   int from = atoi(action[2]);
280   double size=parse_double(action[3]);
281   double clock = smpi_process_simulated_elapsed();
282   MPI_Request request;
283
284   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286
287 #ifdef HAVE_TRACING
288   int rank = smpi_comm_rank(MPI_COMM_WORLD);
289   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291   extra->type = TRACING_IRECV;
292   extra->send_size = size;
293   extra->src = src_traced;
294   extra->dst = rank;
295   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 #endif
298   MPI_Status status;
299   //unknow size from the receiver pov
300   if(size==-1){
301       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
302       size=status.count;
303   }
304
305   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
306
307 #ifdef HAVE_TRACING
308   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
309   request->recv = 1;
310 #endif
311   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_wait(const char *const *action){
317   double clock = smpi_process_simulated_elapsed();
318   MPI_Request request;
319   MPI_Status status;
320
321   xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
322       "action wait not preceded by any irecv or isend: %s",
323       xbt_str_join_array(action," "));
324   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
325   xbt_assert(request != NULL, "found null request in reqq");
326 #ifdef HAVE_TRACING
327   int rank = request->comm != MPI_COMM_NULL
328       ? smpi_comm_rank(request->comm)
329       : -1;
330
331   MPI_Group group = smpi_comm_group(request->comm);
332   int src_traced = smpi_group_rank(group, request->src);
333   int dst_traced = smpi_group_rank(group, request->dst);
334   int is_wait_for_receive = request->recv;
335   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336   extra->type = TRACING_WAIT;
337   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
338 #endif
339   smpi_mpi_wait(&request, &status);
340 #ifdef HAVE_TRACING
341   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
342   if (is_wait_for_receive) {
343     TRACE_smpi_recv(rank, src_traced, dst_traced);
344   }
345 #endif
346
347   log_timed_action (action, clock);
348 }
349
350 static void action_waitall(const char *const *action){
351   double clock = smpi_process_simulated_elapsed();
352   int count_requests=0;
353   unsigned int i=0;
354
355   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
356
357   if (count_requests>0) {
358     MPI_Request requests[count_requests];
359     MPI_Status status[count_requests];
360
361     /*  The reqq is an array of dynars. Its index corresponds to the rank.
362      Thus each rank saves its own requests to the array request. */
363     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
364
365   #ifdef HAVE_TRACING
366    //save information from requests
367
368    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
369    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
370    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
371    for (i = 0; i < count_requests; i++) {
372     if(requests[i]){
373       int *asrc = xbt_new(int, 1);
374       int *adst = xbt_new(int, 1);
375       int *arecv = xbt_new(int, 1);
376       *asrc = requests[i]->src;
377       *adst = requests[i]->dst;
378       *arecv = requests[i]->recv;
379       xbt_dynar_insert_at(srcs, i, asrc);
380       xbt_dynar_insert_at(dsts, i, adst);
381       xbt_dynar_insert_at(recvs, i, arecv);
382       xbt_free(asrc);
383       xbt_free(adst);
384       xbt_free(arecv);
385     }else {
386       int *t = xbt_new(int, 1);
387       xbt_dynar_insert_at(srcs, i, t);
388       xbt_dynar_insert_at(dsts, i, t);
389       xbt_dynar_insert_at(recvs, i, t);
390       xbt_free(t);
391     }
392    }
393    int rank_traced = smpi_process_index();
394    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
395    extra->type = TRACING_WAITALL;
396    extra->send_size=count_requests;
397    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
398  #endif
399
400     smpi_mpi_waitall(count_requests, requests, status);
401
402   #ifdef HAVE_TRACING
403    for (i = 0; i < count_requests; i++) {
404     int src_traced, dst_traced, is_wait_for_receive;
405     xbt_dynar_get_cpy(srcs, i, &src_traced);
406     xbt_dynar_get_cpy(dsts, i, &dst_traced);
407     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
408     if (is_wait_for_receive) {
409       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
410     }
411    }
412    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
413    //clean-up of dynars
414    xbt_dynar_free(&srcs);
415    xbt_dynar_free(&dsts);
416    xbt_dynar_free(&recvs);
417   #endif
418
419    xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
420   }
421   log_timed_action (action, clock);
422 }
423
424 static void action_barrier(const char *const *action){
425   double clock = smpi_process_simulated_elapsed();
426 #ifdef HAVE_TRACING
427   int rank = smpi_comm_rank(MPI_COMM_WORLD);
428   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
429   extra->type = TRACING_BARRIER;
430   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
431 #endif
432   smpi_mpi_barrier(MPI_COMM_WORLD);
433 #ifdef HAVE_TRACING
434   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
435 #endif
436
437   log_timed_action (action, clock);
438 }
439
440
441 static void action_bcast(const char *const *action)
442 {
443   double size = parse_double(action[2]);
444   double clock = smpi_process_simulated_elapsed();
445   int root=0;
446   /*
447    * Initialize MPI_CURRENT_TYPE in order to decrease
448    * the number of the checks
449    * */
450   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
451
452   if(action[3]) {
453     root= atoi(action[3]);
454     if(action[4]) {
455       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
456     }
457   }
458
459 #ifdef HAVE_TRACING
460   int rank = smpi_comm_rank(MPI_COMM_WORLD);
461   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
462
463   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
464   extra->type = TRACING_BCAST;
465   extra->send_size = size;
466   extra->root = root_traced;
467   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
468   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
469
470 #endif
471
472   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
473 #ifdef HAVE_TRACING
474   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
475 #endif
476
477   log_timed_action (action, clock);
478 }
479
480 static void action_reduce(const char *const *action)
481 {
482   double comm_size = parse_double(action[2]);
483   double comp_size = parse_double(action[3]);
484   double clock = smpi_process_simulated_elapsed();
485   int root=0;
486   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
487
488   if(action[4]) {
489     root= atoi(action[4]);
490     if(action[5]) {
491       MPI_CURRENT_TYPE=decode_datatype(action[5]);
492     }
493   }
494
495 #ifdef HAVE_TRACING
496   int rank = smpi_comm_rank(MPI_COMM_WORLD);
497   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
498   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
499   extra->type = TRACING_REDUCE;
500   extra->send_size = comm_size;
501   extra->comp_size = comp_size;
502   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
503   extra->root = root_traced;
504
505   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
506 #endif
507    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
508    smpi_execute_flops(comp_size);
509 #ifdef HAVE_TRACING
510   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
511 #endif
512
513   log_timed_action (action, clock);
514 }
515
516 static void action_allReduce(const char *const *action) {
517   double comm_size = parse_double(action[2]);
518   double comp_size = parse_double(action[3]);
519
520   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
521   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
522
523   double clock = smpi_process_simulated_elapsed();
524 #ifdef HAVE_TRACING
525   int rank = smpi_comm_rank(MPI_COMM_WORLD);
526   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
527   extra->type = TRACING_ALLREDUCE;
528   extra->send_size = comm_size;
529   extra->comp_size = comp_size;
530   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
531
532   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
533 #endif
534   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
535   smpi_execute_flops(comp_size);
536   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
537 #ifdef HAVE_TRACING
538   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
539 #endif
540
541   log_timed_action (action, clock);
542 }
543
544 static void action_allToAll(const char *const *action) {
545   double clock = smpi_process_simulated_elapsed();
546   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
547   int send_size = parse_double(action[2]);
548   int recv_size = parse_double(action[3]);
549   MPI_Datatype MPI_CURRENT_TYPE2;
550
551   if(action[4]) {
552     MPI_CURRENT_TYPE=decode_datatype(action[4]);
553     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
554   }
555   else {
556     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
557     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
558   }
559   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
560   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
561
562 #ifdef HAVE_TRACING
563   int rank = smpi_process_index();
564   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
565   extra->type = TRACING_ALLTOALL;
566   extra->send_size = send_size;
567   extra->recv_size = recv_size;
568   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
569   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
570
571   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
572 #endif
573
574   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
575
576 #ifdef HAVE_TRACING
577   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
578 #endif
579
580   log_timed_action (action, clock);
581   xbt_free(send);
582   xbt_free(recv);
583 }
584
585
586 static void action_gather(const char *const *action) {
587   /*
588  The structure of the gather action for the rank 0 (total 4 processes) 
589  is the following:   
590  0 gather 68 68 0 0 0
591
592   where: 
593   1) 68 is the sendcounts
594   2) 68 is the recvcounts
595   3) 0 is the root node
596   4) 0 is the send datatype id, see decode_datatype()
597   5) 0 is the recv datatype id, see decode_datatype()
598   */
599   double clock = smpi_process_simulated_elapsed();
600   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
601   int send_size = parse_double(action[2]);
602   int recv_size = parse_double(action[3]);
603   MPI_Datatype MPI_CURRENT_TYPE2;
604   if(action[5]) {
605     MPI_CURRENT_TYPE=decode_datatype(action[5]);
606     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
607   } else {
608     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
609     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
610   }
611   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
612   void *recv = NULL;
613
614   int root=atoi(action[4]);
615   int rank = smpi_process_index();
616
617   if(rank==root)
618     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
619
620 #ifdef HAVE_TRACING
621   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
622   extra->type = TRACING_GATHER;
623   extra->send_size = send_size;
624   extra->recv_size = recv_size;
625   extra->root = root;
626   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
627   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
628
629   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
630 #endif
631 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
632                 recv, recv_size, MPI_CURRENT_TYPE2,
633                 root, MPI_COMM_WORLD);
634
635 #ifdef HAVE_TRACING
636   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
637 #endif
638
639   log_timed_action (action, clock);
640   xbt_free(send);
641   xbt_free(recv);
642 }
643
644
645
646 static void action_gatherv(const char *const *action) {
647   /*
648  The structure of the gatherv action for the rank 0 (total 4 processes)
649  is the following:
650  0 gather 68 68 10 10 10 0 0 0
651
652   where:
653   1) 68 is the sendcount
654   2) 68 10 10 10 is the recvcounts
655   3) 0 is the root node
656   4) 0 is the send datatype id, see decode_datatype()
657   5) 0 is the recv datatype id, see decode_datatype()
658   */
659   double clock = smpi_process_simulated_elapsed();
660   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
661   int send_size = parse_double(action[2]);
662   int *disps = xbt_new0(int, comm_size);
663   int *recvcounts = xbt_new0(int, comm_size);
664   int i=0,recv_sum=0;
665
666   MPI_Datatype MPI_CURRENT_TYPE2;
667   if(action[4+comm_size]) {
668     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
669     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
670   } else {
671     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
672     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
673   }
674   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
675   void *recv = NULL;
676   for(i=0;i<comm_size;i++) {
677     recvcounts[i] = atoi(action[i+3]);
678     recv_sum=recv_sum+recvcounts[i];
679     disps[i] = 0;
680   }
681
682   int root=atoi(action[3+comm_size]);
683   int rank = smpi_process_index();
684
685   if(rank==root)
686     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
687
688 #ifdef HAVE_TRACING
689   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
690   extra->type = TRACING_GATHERV;
691   extra->send_size = send_size;
692   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
693   for(i=0; i< comm_size; i++)//copy data to avoid bad free
694     extra->recvcounts[i] = recvcounts[i];
695   extra->root = root;
696   extra->num_processes = comm_size;
697   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
698   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
699
700   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
701 #endif
702 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
703                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
704                 root, MPI_COMM_WORLD);
705
706 #ifdef HAVE_TRACING
707   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
708 #endif
709
710   log_timed_action (action, clock);
711   xbt_free(recvcounts);
712   xbt_free(send);
713   xbt_free(recv);
714   xbt_free(disps);
715
716 }
717
718 static void action_reducescatter(const char *const *action) {
719
720     /*
721  The structure of the reducescatter action for the rank 0 (total 4 processes) 
722  is the following:   
723 0 reduceScatter 275427 275427 275427 204020 11346849 0
724
725   where: 
726   1) The first four values after the name of the action declare the recvcounts array
727   2) The value 11346849 is the amount of instructions
728   3) The last value corresponds to the datatype, see decode_datatype().
729
730   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
731
732    */
733
734   double clock = smpi_process_simulated_elapsed();
735   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
736   int comp_size = parse_double(action[2+comm_size]);
737   int *recvcounts = xbt_new0(int, comm_size);  
738   int *disps = xbt_new0(int, comm_size);  
739   int i=0,recv_sum=0;
740   int root=0;
741   int rank = smpi_process_index();
742
743   if(action[3+comm_size])
744     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
745   else
746     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
747
748   for(i=0;i<comm_size;i++) {
749     recvcounts[i] = atoi(action[i+2]);
750     recv_sum=recv_sum+recvcounts[i];
751     disps[i] = 0;
752   }
753
754 #ifdef HAVE_TRACING
755   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
756   extra->type = TRACING_REDUCE_SCATTER;
757   extra->send_size = 0;
758   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
759   for(i=0; i< comm_size; i++)//copy data to avoid bad free
760     extra->recvcounts[i] = recvcounts[i];
761   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
762   extra->comp_size = comp_size;
763   extra->num_processes = comm_size;
764
765
766   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
767 #endif
768    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
769        root, MPI_COMM_WORLD);
770    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
771                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
772    smpi_execute_flops(comp_size);
773
774
775 #ifdef HAVE_TRACING
776   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
777 #endif
778   xbt_free(recvcounts);
779   xbt_free(disps);
780   log_timed_action (action, clock);
781 }
782
783
784 static void action_allgatherv(const char *const *action) {
785
786   /*
787  The structure of the allgatherv action for the rank 0 (total 4 processes) 
788  is the following:   
789 0 allGatherV 275427 275427 275427 275427 204020
790
791   where: 
792   1) 275427 is the sendcount
793   2) The next four elements declare the recvcounts array
794   3) No more values mean that the datatype for sent and receive buffer
795   is the default one, see decode_datatype().
796
797    */
798
799   double clock = smpi_process_simulated_elapsed();
800
801   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
802   int i=0;
803   int sendcount=atoi(action[2]);
804   int *recvcounts = xbt_new0(int, comm_size);  
805   int *disps = xbt_new0(int, comm_size);  
806   int recv_sum=0;  
807   MPI_Datatype MPI_CURRENT_TYPE2;
808
809   if(action[3+comm_size]) {
810     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
811     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
812   } else {
813     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
814     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
815   }
816   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
817
818   for(i=0;i<comm_size;i++) {
819     recvcounts[i] = atoi(action[i+3]);
820     recv_sum=recv_sum+recvcounts[i];
821   }
822   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
823
824 #ifdef HAVE_TRACING
825   int rank = smpi_process_index();
826   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
827   extra->type = TRACING_ALLGATHERV;
828   extra->send_size = sendcount;
829   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
830   for(i=0; i< comm_size; i++)//copy data to avoid bad free
831     extra->recvcounts[i] = recvcounts[i];
832   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
833   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
834   extra->num_processes = comm_size;
835
836   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
837 #endif
838
839 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
840
841 #ifdef HAVE_TRACING
842   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
843 #endif
844
845   log_timed_action (action, clock);
846   xbt_free(sendbuf);
847   xbt_free(recvbuf);
848   xbt_free(recvcounts);
849   xbt_free(disps);
850 }
851
852
853 static void action_allToAllv(const char *const *action) {
854   /*
855  The structure of the allToAllV action for the rank 0 (total 4 processes) 
856  is the following:   
857   0 allToAllV 100 1 7 10 12 100 1 70 10 5
858
859   where: 
860   1) 100 is the size of the send buffer *sizeof(int),
861   2) 1 7 10 12 is the sendcounts array
862   3) 100*sizeof(int) is the size of the receiver buffer
863   4)  1 70 10 5 is the recvcounts array
864
865    */
866
867
868   double clock = smpi_process_simulated_elapsed();
869
870   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
871   int send_buf_size=0,recv_buf_size=0,i=0;
872   int *sendcounts = xbt_new0(int, comm_size);  
873   int *recvcounts = xbt_new0(int, comm_size);  
874   int *senddisps = xbt_new0(int, comm_size);  
875   int *recvdisps = xbt_new0(int, comm_size);  
876
877   MPI_Datatype MPI_CURRENT_TYPE2;
878
879   send_buf_size=parse_double(action[2]);
880   recv_buf_size=parse_double(action[3+comm_size]);
881   if(action[4+2*comm_size]) {
882     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
883     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
884   }
885   else {
886       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
887       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
888   }
889
890   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
891   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
892
893   for(i=0;i<comm_size;i++) {
894     sendcounts[i] = atoi(action[i+3]);
895     recvcounts[i] = atoi(action[i+4+comm_size]);
896   }
897
898
899 #ifdef HAVE_TRACING
900   int rank = smpi_process_index();
901   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
902   extra->type = TRACING_ALLTOALLV;
903   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
904   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
905   extra->num_processes = comm_size;
906
907   for(i=0; i< comm_size; i++){//copy data to avoid bad free
908     extra->send_size += sendcounts[i];
909     extra->sendcounts[i] = sendcounts[i];
910     extra->recv_size += recvcounts[i];
911     extra->recvcounts[i] = recvcounts[i];
912   }
913   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
914   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
915
916   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
917 #endif
918     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
919                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
920                                MPI_COMM_WORLD);
921 #ifdef HAVE_TRACING
922   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
923 #endif
924
925   log_timed_action (action, clock);
926   xbt_free(sendbuf);
927   xbt_free(recvbuf);
928   xbt_free(sendcounts);
929   xbt_free(recvcounts);
930   xbt_free(senddisps);
931   xbt_free(recvdisps);
932 }
933
934 void smpi_replay_init(int *argc, char***argv){
935   smpi_process_init(argc, argv);
936   smpi_process_mark_as_initialized();
937 #ifdef HAVE_TRACING
938   int rank = smpi_process_index();
939   TRACE_smpi_init(rank);
940   TRACE_smpi_computing_init(rank);
941   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
942   extra->type = TRACING_INIT;
943   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
944   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
945 #endif
946
947   if (!smpi_process_index()){
948     _xbt_replay_action_init();
949     xbt_replay_action_register("init",       action_init);
950     xbt_replay_action_register("finalize",   action_finalize);
951     xbt_replay_action_register("comm_size",  action_comm_size);
952     xbt_replay_action_register("comm_split", action_comm_split);
953     xbt_replay_action_register("comm_dup",   action_comm_dup);
954     xbt_replay_action_register("send",       action_send);
955     xbt_replay_action_register("Isend",      action_Isend);
956     xbt_replay_action_register("recv",       action_recv);
957     xbt_replay_action_register("Irecv",      action_Irecv);
958     xbt_replay_action_register("wait",       action_wait);
959     xbt_replay_action_register("waitAll",    action_waitall);
960     xbt_replay_action_register("barrier",    action_barrier);
961     xbt_replay_action_register("bcast",      action_bcast);
962     xbt_replay_action_register("reduce",     action_reduce);
963     xbt_replay_action_register("allReduce",  action_allReduce);
964     xbt_replay_action_register("allToAll",   action_allToAll);
965     xbt_replay_action_register("allToAllV",  action_allToAllv);
966     xbt_replay_action_register("gather",  action_gather);
967     xbt_replay_action_register("gatherV",  action_gatherv);
968     xbt_replay_action_register("allGatherV",  action_allgatherv);
969     xbt_replay_action_register("reduceScatter",  action_reducescatter);
970     xbt_replay_action_register("compute",    action_compute);
971   }
972
973   xbt_replay_action_runner(*argc, *argv);
974 }
975
976 int smpi_replay_finalize(){
977   double sim_time= 1.;
978   /* One active process will stop. Decrease the counter*/
979   XBT_DEBUG("There are %lu elements in reqq[*]",
980             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
981   if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
982     int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
983     MPI_Request requests[count_requests];
984     MPI_Status status[count_requests];
985     unsigned int i;
986
987     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
988     smpi_mpi_waitall(count_requests, requests, status);
989     active_processes--;
990   } else {
991     active_processes--;
992   }
993
994   xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
995
996   if(!active_processes){
997     /* Last process alive speaking */
998     /* end the simulated timer */
999     sim_time = smpi_process_simulated_elapsed();
1000     XBT_INFO("Simulation time %f", sim_time);
1001     _xbt_replay_action_exit();
1002     xbt_free(reqq);
1003     reqq = NULL;
1004   }
1005   smpi_mpi_barrier(MPI_COMM_WORLD);
1006 #ifdef HAVE_TRACING
1007   int rank = smpi_process_index();
1008   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1009   extra->type = TRACING_FINALIZE;
1010   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1011 #endif
1012   smpi_process_finalize();
1013 #ifdef HAVE_TRACING
1014   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1015   TRACE_smpi_finalize(smpi_process_index());
1016 #endif
1017   smpi_process_destroy();
1018   return MPI_SUCCESS;
1019 }