Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3d2f1be123839c44da01406249c4b65afa1c183c
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80 static void action_init(const char *const *action)
81 {
82   int i;
83   XBT_DEBUG("Initialize the counters");
84   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
85   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
86
87   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
88   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
89
90   smpi_process_set_user_data((void*) globals);
91
92   /* start a simulated timer */
93   smpi_process_simulated_start();
94   /*initialize the number of active processes */
95   active_processes = smpi_process_count();
96
97   if (!reqq) {
98     reqq=xbt_new0(xbt_dynar_t,active_processes);
99
100     for(i=0;i<active_processes;i++){
101       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
102     }
103   }
104 }
105
106 static void action_finalize(const char *const *action)
107 {
108   smpi_replay_globals_t globals =
109       (smpi_replay_globals_t) smpi_process_get_user_data();
110   if (globals){
111     XBT_DEBUG("There are %lu irecvs in the dynar",
112          xbt_dynar_length(globals->irecvs));
113     xbt_dynar_free_container(&(globals->irecvs));
114   }
115   free(globals);
116 }
117
118 static void action_comm_size(const char *const *action)
119 {
120   double clock = smpi_process_simulated_elapsed();
121
122   communicator_size = parse_double(action[2]);
123   log_timed_action (action, clock);
124 }
125
126 static void action_comm_split(const char *const *action)
127 {
128   double clock = smpi_process_simulated_elapsed();
129
130   log_timed_action (action, clock);
131 }
132
133 static void action_comm_dup(const char *const *action)
134 {
135   double clock = smpi_process_simulated_elapsed();
136
137   log_timed_action (action, clock);
138 }
139
140 static void action_compute(const char *const *action)
141 {
142   double clock = smpi_process_simulated_elapsed();
143   smpi_execute_flops(parse_double(action[2]));
144
145   log_timed_action (action, clock);
146 }
147
148 static void action_send(const char *const *action)
149 {
150   int to = atoi(action[2]);
151   double size=parse_double(action[3]);
152   double clock = smpi_process_simulated_elapsed();
153
154   if(action[4]) {
155     MPI_CURRENT_TYPE=decode_datatype(action[4]);
156   } else {
157     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
158   }
159
160 #ifdef HAVE_TRACING
161   int rank = smpi_comm_rank(MPI_COMM_WORLD);
162
163   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
164   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
165   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
166 #endif
167
168   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
169
170   log_timed_action (action, clock);
171
172   #ifdef HAVE_TRACING
173   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
174 #endif
175
176 }
177
178 static void action_Isend(const char *const *action)
179 {
180   int to = atoi(action[2]);
181   double size=parse_double(action[3]);
182   double clock = smpi_process_simulated_elapsed();
183   MPI_Request request;
184
185   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
186   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
187
188 #ifdef HAVE_TRACING
189   int rank = smpi_comm_rank(MPI_COMM_WORLD);
190   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
191   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
192   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
193 #endif
194
195   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
196
197 #ifdef HAVE_TRACING
198   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
199   request->send = 1;
200 #endif
201
202   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
203
204   log_timed_action (action, clock);
205 }
206
207 static void action_recv(const char *const *action) {
208   int from = atoi(action[2]);
209   double size=parse_double(action[3]);
210   double clock = smpi_process_simulated_elapsed();
211   MPI_Status status;
212
213   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
214   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
215
216 #ifdef HAVE_TRACING
217   int rank = smpi_comm_rank(MPI_COMM_WORLD);
218   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
219
220   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
221 #endif
222
223   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
224
225 #ifdef HAVE_TRACING
226   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
227   TRACE_smpi_recv(rank, src_traced, rank);
228 #endif
229
230   log_timed_action (action, clock);
231 }
232
233 static void action_Irecv(const char *const *action)
234 {
235   int from = atoi(action[2]);
236   double size=parse_double(action[3]);
237   double clock = smpi_process_simulated_elapsed();
238   MPI_Request request;
239
240   smpi_replay_globals_t globals =
241      (smpi_replay_globals_t) smpi_process_get_user_data();
242
243   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
244   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
245
246 #ifdef HAVE_TRACING
247   int rank = smpi_comm_rank(MPI_COMM_WORLD);
248   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
249   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
250 #endif
251
252   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
253
254 #ifdef HAVE_TRACING
255   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
256   request->recv = 1;
257 #endif
258   xbt_dynar_push(globals->irecvs,&request);
259   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
260
261   log_timed_action (action, clock);
262 }
263
264 static void action_wait(const char *const *action){
265   double clock = smpi_process_simulated_elapsed();
266   MPI_Request request;
267   MPI_Status status;
268   smpi_replay_globals_t globals =
269       (smpi_replay_globals_t) smpi_process_get_user_data();
270
271   xbt_assert(xbt_dynar_length(globals->irecvs),
272       "action wait not preceded by any irecv: %s",
273       xbt_str_join_array(action," "));
274   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
275 #ifdef HAVE_TRACING
276   int rank = request && request->comm != MPI_COMM_NULL
277       ? smpi_comm_rank(request->comm)
278       : -1;
279
280   MPI_Group group = smpi_comm_group(request->comm);
281   int src_traced = smpi_group_rank(group, request->src);
282   int dst_traced = smpi_group_rank(group, request->dst);
283   int is_wait_for_receive = request->recv;
284   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, -1);
285 #endif
286   smpi_mpi_wait(&request, &status);
287 #ifdef HAVE_TRACING
288   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
289   if (is_wait_for_receive) {
290     TRACE_smpi_recv(rank, src_traced, dst_traced);
291   }
292 #endif
293
294   log_timed_action (action, clock);
295 }
296
297 static void action_waitall(const char *const *action){
298   double clock = smpi_process_simulated_elapsed();
299   int count_requests=0;
300   unsigned int i=0;
301
302   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
303
304   if (count_requests>0) {
305     MPI_Request requests[count_requests];
306     MPI_Status status[count_requests];
307
308     /*  The reqq is an array of dynars. Its index corresponds to the rank.
309      Thus each rank saves its own requests to the array request. */
310     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
311
312   #ifdef HAVE_TRACING
313    //save information from requests
314
315    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
316    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
317    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
318    for (i = 0; i < count_requests; i++) {
319     if(requests[i]){
320       int *asrc = xbt_new(int, 1);
321       int *adst = xbt_new(int, 1);
322       int *arecv = xbt_new(int, 1);
323       *asrc = requests[i]->src;
324       *adst = requests[i]->dst;
325       *arecv = requests[i]->recv;
326       xbt_dynar_insert_at(srcs, i, asrc);
327       xbt_dynar_insert_at(dsts, i, adst);
328       xbt_dynar_insert_at(recvs, i, arecv);
329       xbt_free(asrc);
330       xbt_free(adst);
331       xbt_free(arecv);
332     }else {
333       int *t = xbt_new(int, 1);
334       xbt_dynar_insert_at(srcs, i, t);
335       xbt_dynar_insert_at(dsts, i, t);
336       xbt_dynar_insert_at(recvs, i, t);
337       xbt_free(t);
338     }
339    }
340    int rank_traced = smpi_process_index();
341    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__, count_requests);
342   #endif
343
344     smpi_mpi_waitall(count_requests, requests, status);
345
346   #ifdef HAVE_TRACING
347    for (i = 0; i < count_requests; i++) {
348     int src_traced, dst_traced, is_wait_for_receive;
349     xbt_dynar_get_cpy(srcs, i, &src_traced);
350     xbt_dynar_get_cpy(dsts, i, &dst_traced);
351     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
352     if (is_wait_for_receive) {
353       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
354     }
355    }
356    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
357    //clean-up of dynars
358    xbt_dynar_free(&srcs);
359    xbt_dynar_free(&dsts);
360    xbt_dynar_free(&recvs);
361   #endif
362
363    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
364   }
365   log_timed_action (action, clock);
366 }
367
368 static void action_barrier(const char *const *action){
369   double clock = smpi_process_simulated_elapsed();
370 #ifdef HAVE_TRACING
371   int rank = smpi_comm_rank(MPI_COMM_WORLD);
372   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, smpi_comm_size(MPI_COMM_WORLD));
373 #endif
374   smpi_mpi_barrier(MPI_COMM_WORLD);
375 #ifdef HAVE_TRACING
376   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
377 #endif
378
379   log_timed_action (action, clock);
380 }
381
382
383 static void action_bcast(const char *const *action)
384 {
385   double size = parse_double(action[2]);
386   double clock = smpi_process_simulated_elapsed();
387   int root=0;
388   /*
389    * Initialize MPI_CURRENT_TYPE in order to decrease
390    * the number of the checks
391    * */
392   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
393
394   if(action[3]) {
395     root= atoi(action[3]);
396     if(action[4]) {
397       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
398     }
399   }
400
401 #ifdef HAVE_TRACING
402   int rank = smpi_comm_rank(MPI_COMM_WORLD);
403   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
404   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,size*smpi_datatype_size(MPI_CURRENT_TYPE));
405 #endif
406
407   smpi_mpi_bcast(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
408 #ifdef HAVE_TRACING
409   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
410 #endif
411
412   log_timed_action (action, clock);
413 }
414
415 static void action_reduce(const char *const *action)
416 {
417   double comm_size = parse_double(action[2]);
418   double comp_size = parse_double(action[3]);
419   double clock = smpi_process_simulated_elapsed();
420   int root=0;
421   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
422
423   if(action[4]) {
424     root= atoi(action[4]);
425     if(action[5]) {
426       MPI_CURRENT_TYPE=decode_datatype(action[5]);
427     }
428   }
429
430 #ifdef HAVE_TRACING
431   int rank = smpi_comm_rank(MPI_COMM_WORLD);
432   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
433   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,comm_size*smpi_datatype_size(MPI_CURRENT_TYPE));
434 #endif
435    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
436    smpi_execute_flops(comp_size);
437 #ifdef HAVE_TRACING
438   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
439 #endif
440
441   log_timed_action (action, clock);
442 }
443
444 static void action_allReduce(const char *const *action) {
445   double comm_size = parse_double(action[2]);
446   double comp_size = parse_double(action[3]);
447
448   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
449   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
450
451   double clock = smpi_process_simulated_elapsed();
452 #ifdef HAVE_TRACING
453   int rank = smpi_comm_rank(MPI_COMM_WORLD);
454   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,comp_size*smpi_datatype_size(MPI_CURRENT_TYPE));
455 #endif
456   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
457   smpi_execute_flops(comp_size);
458   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
459 #ifdef HAVE_TRACING
460   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
461 #endif
462
463   log_timed_action (action, clock);
464 }
465
466 static void action_allToAll(const char *const *action) {
467   double clock = smpi_process_simulated_elapsed();
468   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
469   int send_size = parse_double(action[2]);
470   int recv_size = parse_double(action[3]);
471   MPI_Datatype MPI_CURRENT_TYPE2;
472
473   if(action[4]) {
474     MPI_CURRENT_TYPE=decode_datatype(action[4]);
475     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
476   }
477   else {
478     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
479     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
480   }
481   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
482   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
483
484 #ifdef HAVE_TRACING
485   int rank = smpi_process_index();
486   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
487 #endif
488
489   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
490
491 #ifdef HAVE_TRACING
492   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
493 #endif
494
495   log_timed_action (action, clock);
496   xbt_free(send);
497   xbt_free(recv);
498 }
499
500
501 static void action_gather(const char *const *action) {
502   /*
503  The structure of the gather action for the rank 0 (total 4 processes) 
504  is the following:   
505  0 gather 68 68 0 0 0
506
507   where: 
508   1) 68 is the sendcounts
509   2) 68 is the recvcounts
510   3) 0 is the root node
511   4) 0 is the send datatype id, see decode_datatype()
512   5) 0 is the recv datatype id, see decode_datatype()
513   */
514   double clock = smpi_process_simulated_elapsed();
515   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
516   int send_size = parse_double(action[2]);
517   int recv_size = parse_double(action[3]);
518   MPI_Datatype MPI_CURRENT_TYPE2;
519   if(action[5]) {
520     MPI_CURRENT_TYPE=decode_datatype(action[5]);
521     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
522   } else {
523     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
524     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
525   }
526   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
527   void *recv = calloc(recv_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
528
529   int root=atoi(action[4]);
530   int rank = smpi_process_index();
531
532   if(rank==root)
533     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
534
535 #ifdef HAVE_TRACING
536   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
537 #endif
538 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
539                 recv, recv_size, MPI_CURRENT_TYPE2,
540                 root, MPI_COMM_WORLD);
541
542 #ifdef HAVE_TRACING
543   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
544 #endif
545
546   log_timed_action (action, clock);
547   xbt_free(send);
548   xbt_free(recv);
549 }
550
551
552 static void action_reducescatter(const char *const *action) {
553
554     /*
555  The structure of the reducescatter action for the rank 0 (total 4 processes) 
556  is the following:   
557 0 reduceScatter 275427 275427 275427 204020 11346849 0
558
559   where: 
560   1) The first four values after the name of the action declare the recvcounts array
561   2) The value 11346849 is the amount of instructions
562   3) The last value corresponds to the datatype, see decode_datatype().
563
564   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
565
566    */
567
568   double clock = smpi_process_simulated_elapsed();
569   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
570   int comp_size = parse_double(action[2+comm_size]);
571   int *recvcounts = xbt_new0(int, comm_size);  
572   int *disps = xbt_new0(int, comm_size);  
573   int i=0,recv_sum=0;
574   int root=0;
575   int rank = smpi_process_index();
576
577   if(action[3+comm_size])
578     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
579   else
580     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
581
582   for(i=0;i<comm_size;i++) {
583     recvcounts[i] = atoi(action[i+2]);
584     recv_sum=recv_sum+recvcounts[i];
585     disps[i] = 0;
586   }
587
588 #ifdef HAVE_TRACING
589   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, recv_sum*smpi_datatype_size(MPI_CURRENT_TYPE));
590 #endif
591    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
592        root, MPI_COMM_WORLD);
593    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
594                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
595    smpi_execute_flops(comp_size);
596
597
598 #ifdef HAVE_TRACING
599   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
600 #endif
601
602   log_timed_action (action, clock);
603 }
604
605
606 static void action_allgatherv(const char *const *action) {
607
608   /*
609  The structure of the allgatherv action for the rank 0 (total 4 processes) 
610  is the following:   
611 0 allGatherV 275427 275427 275427 275427 204020
612
613   where: 
614   1) 275427 is the sendcount
615   2) The next four elements declare the recvcounts array
616   3) No more values mean that the datatype for sent and receive buffer
617   is the default one, see decode_datatype().
618
619    */
620
621   double clock = smpi_process_simulated_elapsed();
622
623   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
624   int i=0;
625   int sendcount=atoi(action[2]);
626   int *recvcounts = xbt_new0(int, comm_size);  
627   int *disps = xbt_new0(int, comm_size);  
628   int recv_sum=0;  
629   MPI_Datatype MPI_CURRENT_TYPE2;
630
631   if(action[3+comm_size]) {
632     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
633     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
634   } else {
635     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
636     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
637   }
638   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
639
640   for(i=0;i<comm_size;i++) {
641     recvcounts[i] = atoi(action[i+3]);
642     recv_sum=recv_sum+recvcounts[i];
643   }
644   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
645
646 #ifdef HAVE_TRACING
647   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
648   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,sendcount*smpi_datatype_size(MPI_CURRENT_TYPE));
649 #endif
650
651 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
652
653 #ifdef HAVE_TRACING
654   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
655 #endif
656
657   log_timed_action (action, clock);
658   xbt_free(sendbuf);
659   xbt_free(recvbuf);
660   xbt_free(recvcounts);
661   xbt_free(disps);
662 }
663
664
665 static void action_allToAllv(const char *const *action) {
666   /*
667  The structure of the allToAllV action for the rank 0 (total 4 processes) 
668  is the following:   
669   0 allToAllV 100 1 7 10 12 100 1 70 10 5
670
671   where: 
672   1) 100 is the size of the send buffer *sizeof(int),
673   2) 1 7 10 12 is the sendcounts array
674   3) 100*sizeof(int) is the size of the receiver buffer
675   4)  1 70 10 5 is the recvcounts array
676
677    */
678
679
680   double clock = smpi_process_simulated_elapsed();
681
682   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
683   int send_buf_size=0,recv_buf_size=0,i=0;
684   int *sendcounts = xbt_new0(int, comm_size);  
685   int *recvcounts = xbt_new0(int, comm_size);  
686   int *senddisps = xbt_new0(int, comm_size);  
687   int *recvdisps = xbt_new0(int, comm_size);  
688
689   MPI_Datatype MPI_CURRENT_TYPE2;
690
691   send_buf_size=parse_double(action[2]);
692   recv_buf_size=parse_double(action[3+comm_size]);
693   if(action[4+2*comm_size]) {
694     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
695     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
696   }
697   else {
698       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
699       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
700   }
701
702   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
703   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
704
705   for(i=0;i<comm_size;i++) {
706     sendcounts[i] = atoi(action[i+3]);
707     recvcounts[i] = atoi(action[i+4+comm_size]);
708   }
709
710
711 #ifdef HAVE_TRACING
712   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
713   int count=0;
714   for(i=0;i<comm_size;i++) count+=sendcounts[i];
715   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,count*smpi_datatype_size(MPI_CURRENT_TYPE));
716 #endif
717     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
718                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
719                                MPI_COMM_WORLD);
720 #ifdef HAVE_TRACING
721   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
722 #endif
723
724   log_timed_action (action, clock);
725   xbt_free(sendbuf);
726   xbt_free(recvbuf);
727   xbt_free(sendcounts);
728   xbt_free(recvcounts);
729   xbt_free(senddisps);
730   xbt_free(recvdisps);
731 }
732
733 void smpi_replay_init(int *argc, char***argv){
734   PMPI_Init(argc, argv);
735   if (!smpi_process_index()){
736     _xbt_replay_action_init();
737     xbt_replay_action_register("init",       action_init);
738     xbt_replay_action_register("finalize",   action_finalize);
739     xbt_replay_action_register("comm_size",  action_comm_size);
740     xbt_replay_action_register("comm_split", action_comm_split);
741     xbt_replay_action_register("comm_dup",   action_comm_dup);
742     xbt_replay_action_register("send",       action_send);
743     xbt_replay_action_register("Isend",      action_Isend);
744     xbt_replay_action_register("recv",       action_recv);
745     xbt_replay_action_register("Irecv",      action_Irecv);
746     xbt_replay_action_register("wait",       action_wait);
747     xbt_replay_action_register("waitAll",    action_waitall);
748     xbt_replay_action_register("barrier",    action_barrier);
749     xbt_replay_action_register("bcast",      action_bcast);
750     xbt_replay_action_register("reduce",     action_reduce);
751     xbt_replay_action_register("allReduce",  action_allReduce);
752     xbt_replay_action_register("allToAll",   action_allToAll);
753     xbt_replay_action_register("allToAllV",  action_allToAllv);
754     xbt_replay_action_register("gather",  action_gather);
755     xbt_replay_action_register("allGatherV",  action_allgatherv);
756     xbt_replay_action_register("reduceScatter",  action_reducescatter);
757     xbt_replay_action_register("compute",    action_compute);
758   }
759
760   xbt_replay_action_runner(*argc, *argv);
761 }
762
763 int smpi_replay_finalize(){
764   double sim_time= 1.;
765   /* One active process will stop. Decrease the counter*/
766   active_processes--;
767   XBT_DEBUG("There are %lu elements in reqq[*]",
768             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
769   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
770   if(!active_processes){
771     /* Last process alive speaking */
772     /* end the simulated timer */
773     sim_time = smpi_process_simulated_elapsed();
774     XBT_INFO("Simulation time %g", sim_time);
775     _xbt_replay_action_exit();
776     xbt_free(reqq);
777     reqq = NULL;
778   }
779   smpi_mpi_barrier(MPI_COMM_WORLD);
780   return PMPI_Finalize();
781 }