Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d52ec60c41b491f17eb6814073aae3f6c447c8b6
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009 - 2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80 static void action_init(const char *const *action)
81 {
82   int i;
83   XBT_DEBUG("Initialize the counters");
84   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
85   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
86
87   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
88   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
89
90   smpi_process_set_user_data((void*) globals);
91
92   /* start a simulated timer */
93   smpi_process_simulated_start();
94   /*initialize the number of active processes */
95   active_processes = smpi_process_count();
96
97   if (!reqq) {
98     reqq=xbt_new0(xbt_dynar_t,active_processes);
99
100     for(i=0;i<active_processes;i++){
101       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
102     }
103   }
104 }
105
106 static void action_finalize(const char *const *action)
107 {
108   smpi_replay_globals_t globals =
109       (smpi_replay_globals_t) smpi_process_get_user_data();
110   if (globals){
111     XBT_DEBUG("There are %lu irecvs in the dynar",
112          xbt_dynar_length(globals->irecvs));
113     xbt_dynar_free_container(&(globals->irecvs));
114   }
115   free(globals);
116 }
117
118 static void action_comm_size(const char *const *action)
119 {
120   double clock = smpi_process_simulated_elapsed();
121
122   communicator_size = parse_double(action[2]);
123   log_timed_action (action, clock);
124 }
125
126 static void action_comm_split(const char *const *action)
127 {
128   double clock = smpi_process_simulated_elapsed();
129
130   log_timed_action (action, clock);
131 }
132
133 static void action_comm_dup(const char *const *action)
134 {
135   double clock = smpi_process_simulated_elapsed();
136
137   log_timed_action (action, clock);
138 }
139
140 static void action_compute(const char *const *action)
141 {
142   double clock = smpi_process_simulated_elapsed();
143   smpi_execute_flops(parse_double(action[2]));
144
145   log_timed_action (action, clock);
146 }
147
148 static void action_send(const char *const *action)
149 {
150   int to = atoi(action[2]);
151   double size=parse_double(action[3]);
152   double clock = smpi_process_simulated_elapsed();
153
154   if(action[4]) {
155     MPI_CURRENT_TYPE=decode_datatype(action[4]);
156   } else {
157     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
158   }
159
160 #ifdef HAVE_TRACING
161   int rank = smpi_comm_rank(MPI_COMM_WORLD);
162   TRACE_smpi_computing_out(rank);
163   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
164   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
165   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
166 #endif
167
168   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
169
170   log_timed_action (action, clock);
171
172   #ifdef HAVE_TRACING
173   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
174   TRACE_smpi_computing_in(rank);
175 #endif
176
177 }
178
179 static void action_Isend(const char *const *action)
180 {
181   int to = atoi(action[2]);
182   double size=parse_double(action[3]);
183   double clock = smpi_process_simulated_elapsed();
184   MPI_Request request;
185
186   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
187   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
188
189 #ifdef HAVE_TRACING
190   int rank = smpi_comm_rank(MPI_COMM_WORLD);
191   TRACE_smpi_computing_out(rank);
192   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
193   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
194   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
195 #endif
196
197   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
198
199 #ifdef HAVE_TRACING
200   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
201   request->send = 1;
202   TRACE_smpi_computing_in(rank);
203 #endif
204
205   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_recv(const char *const *action) {
211   int from = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process_simulated_elapsed();
214   MPI_Status status;
215
216   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
218
219 #ifdef HAVE_TRACING
220   int rank = smpi_comm_rank(MPI_COMM_WORLD);
221   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
222   TRACE_smpi_computing_out(rank);
223
224   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
231   TRACE_smpi_recv(rank, src_traced, rank);
232   TRACE_smpi_computing_in(rank);
233 #endif
234
235   log_timed_action (action, clock);
236 }
237
238 static void action_Irecv(const char *const *action)
239 {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Request request;
244
245   smpi_replay_globals_t globals =
246      (smpi_replay_globals_t) smpi_process_get_user_data();
247
248   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250
251 #ifdef HAVE_TRACING
252   int rank = smpi_comm_rank(MPI_COMM_WORLD);
253   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
254   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
255 #endif
256
257   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
258
259 #ifdef HAVE_TRACING
260   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
261   request->recv = 1;
262 #endif
263   xbt_dynar_push(globals->irecvs,&request);
264   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
265
266   log_timed_action (action, clock);
267 }
268
269 static void action_wait(const char *const *action){
270   double clock = smpi_process_simulated_elapsed();
271   MPI_Request request;
272   MPI_Status status;
273   smpi_replay_globals_t globals =
274       (smpi_replay_globals_t) smpi_process_get_user_data();
275
276   xbt_assert(xbt_dynar_length(globals->irecvs),
277       "action wait not preceded by any irecv: %s",
278       xbt_str_join_array(action," "));
279   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
280 #ifdef HAVE_TRACING
281   int rank = request && request->comm != MPI_COMM_NULL
282       ? smpi_comm_rank(request->comm)
283       : -1;
284   TRACE_smpi_computing_out(rank);
285
286   MPI_Group group = smpi_comm_group(request->comm);
287   int src_traced = smpi_group_rank(group, request->src);
288   int dst_traced = smpi_group_rank(group, request->dst);
289   int is_wait_for_receive = request->recv;
290   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, -1);
291 #endif
292   smpi_mpi_wait(&request, &status);
293 #ifdef HAVE_TRACING
294   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
295   if (is_wait_for_receive) {
296     TRACE_smpi_recv(rank, src_traced, dst_traced);
297   }
298   TRACE_smpi_computing_in(rank);
299 #endif
300
301   log_timed_action (action, clock);
302 }
303
304 static void action_waitall(const char *const *action){
305   double clock = smpi_process_simulated_elapsed();
306   int count_requests=0;
307   unsigned int i=0;
308
309   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
310
311   if (count_requests>0) {
312     MPI_Request requests[count_requests];
313     MPI_Status status[count_requests];
314
315     /*  The reqq is an array of dynars. Its index corresponds to the rank.
316      Thus each rank saves its own requests to the array request. */
317     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
318
319   #ifdef HAVE_TRACING
320    //save information from requests
321
322    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
323    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
324    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
325    for (i = 0; i < count_requests; i++) {
326     if(requests[i]){
327       int *asrc = xbt_new(int, 1);
328       int *adst = xbt_new(int, 1);
329       int *arecv = xbt_new(int, 1);
330       *asrc = requests[i]->src;
331       *adst = requests[i]->dst;
332       *arecv = requests[i]->recv;
333       xbt_dynar_insert_at(srcs, i, asrc);
334       xbt_dynar_insert_at(dsts, i, adst);
335       xbt_dynar_insert_at(recvs, i, arecv);
336       xbt_free(asrc);
337       xbt_free(adst);
338       xbt_free(arecv);
339     }else {
340       int *t = xbt_new(int, 1);
341       xbt_dynar_insert_at(srcs, i, t);
342       xbt_dynar_insert_at(dsts, i, t);
343       xbt_dynar_insert_at(recvs, i, t);
344       xbt_free(t);
345     }
346    }
347    int rank_traced = smpi_process_index();
348    TRACE_smpi_computing_out(rank_traced);
349
350    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__, count_requests);
351   #endif
352
353     smpi_mpi_waitall(count_requests, requests, status);
354
355   #ifdef HAVE_TRACING
356    for (i = 0; i < count_requests; i++) {
357     int src_traced, dst_traced, is_wait_for_receive;
358     xbt_dynar_get_cpy(srcs, i, &src_traced);
359     xbt_dynar_get_cpy(dsts, i, &dst_traced);
360     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
361     if (is_wait_for_receive) {
362       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
363     }
364    }
365    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
366    //clean-up of dynars
367    xbt_dynar_free(&srcs);
368    xbt_dynar_free(&dsts);
369    xbt_dynar_free(&recvs);
370    TRACE_smpi_computing_in(rank_traced);
371   #endif
372
373    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
374   }
375   log_timed_action (action, clock);
376 }
377
378 static void action_barrier(const char *const *action){
379   double clock = smpi_process_simulated_elapsed();
380 #ifdef HAVE_TRACING
381   int rank = smpi_comm_rank(MPI_COMM_WORLD);
382   TRACE_smpi_computing_out(rank);
383   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, smpi_comm_size(MPI_COMM_WORLD));
384 #endif
385   smpi_mpi_barrier(MPI_COMM_WORLD);
386 #ifdef HAVE_TRACING
387   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
388   TRACE_smpi_computing_in(rank);
389 #endif
390
391   log_timed_action (action, clock);
392 }
393
394
395 static void action_bcast(const char *const *action)
396 {
397   double size = parse_double(action[2]);
398   double clock = smpi_process_simulated_elapsed();
399   int root=0;
400   /*
401    * Initialize MPI_CURRENT_TYPE in order to decrease
402    * the number of the checks
403    * */
404   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
405
406   if(action[3]) {
407     root= atoi(action[3]);
408     if(action[4]) {
409       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
410     }
411   }
412
413 #ifdef HAVE_TRACING
414   int rank = smpi_comm_rank(MPI_COMM_WORLD);
415   TRACE_smpi_computing_out(rank);
416   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
417   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,size*smpi_datatype_size(MPI_CURRENT_TYPE));
418 #endif
419
420   smpi_mpi_bcast(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
421 #ifdef HAVE_TRACING
422   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
423   TRACE_smpi_computing_in(rank);
424 #endif
425
426   log_timed_action (action, clock);
427 }
428
429 static void action_reduce(const char *const *action)
430 {
431   double comm_size = parse_double(action[2]);
432   double comp_size = parse_double(action[3]);
433   double clock = smpi_process_simulated_elapsed();
434   int root=0;
435   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
436
437   if(action[4]) {
438     root= atoi(action[4]);
439     if(action[5]) {
440       MPI_CURRENT_TYPE=decode_datatype(action[5]);
441     }
442   }
443
444 #ifdef HAVE_TRACING
445   int rank = smpi_comm_rank(MPI_COMM_WORLD);
446   TRACE_smpi_computing_out(rank);
447   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
448   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,comm_size*smpi_datatype_size(MPI_CURRENT_TYPE));
449 #endif
450    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
451    smpi_execute_flops(comp_size);
452 #ifdef HAVE_TRACING
453   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
454   TRACE_smpi_computing_in(rank);
455 #endif
456
457   log_timed_action (action, clock);
458 }
459
460 static void action_allReduce(const char *const *action) {
461   double comm_size = parse_double(action[2]);
462   double comp_size = parse_double(action[3]);
463
464   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
465   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466
467   double clock = smpi_process_simulated_elapsed();
468 #ifdef HAVE_TRACING
469   int rank = smpi_comm_rank(MPI_COMM_WORLD);
470   TRACE_smpi_computing_out(rank);
471   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,comp_size*smpi_datatype_size(MPI_CURRENT_TYPE));
472 #endif
473   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
474   smpi_execute_flops(comp_size);
475   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
476 #ifdef HAVE_TRACING
477   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
478   TRACE_smpi_computing_in(rank);
479 #endif
480
481   log_timed_action (action, clock);
482 }
483
484 static void action_allToAll(const char *const *action) {
485   double clock = smpi_process_simulated_elapsed();
486   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
487   int send_size = parse_double(action[2]);
488   int recv_size = parse_double(action[3]);
489   MPI_Datatype MPI_CURRENT_TYPE2;
490
491   if(action[4]) {
492     MPI_CURRENT_TYPE=decode_datatype(action[4]);
493     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
494   }
495   else {
496     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
497     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
498   }
499   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
500   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
501
502 #ifdef HAVE_TRACING
503   int rank = smpi_process_index();
504   TRACE_smpi_computing_out(rank);
505   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
506 #endif
507
508   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
509
510 #ifdef HAVE_TRACING
511   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
512   TRACE_smpi_computing_in(rank);
513 #endif
514
515   log_timed_action (action, clock);
516   xbt_free(send);
517   xbt_free(recv);
518 }
519
520
521 static void action_gather(const char *const *action) {
522   /*
523  The structure of the gather action for the rank 0 (total 4 processes) 
524  is the following:   
525  0 gather 68 68 0 0 0
526
527   where: 
528   1) 68 is the sendcounts
529   2) 68 is the recvcounts
530   3) 0 is the root node
531   4) 0 is the send datatype id, see decode_datatype()
532   5) 0 is the recv datatype id, see decode_datatype()
533   */
534   double clock = smpi_process_simulated_elapsed();
535   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
536   int send_size = parse_double(action[2]);
537   int recv_size = parse_double(action[3]);
538   MPI_Datatype MPI_CURRENT_TYPE2;
539   if(action[5]) {
540     MPI_CURRENT_TYPE=decode_datatype(action[5]);
541     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
542   } else {
543     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
544     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
545   }
546   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
547   void *recv = calloc(recv_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
548
549   int root=atoi(action[4]);
550   int rank = smpi_process_index();
551
552   if(rank==root)
553     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
554
555 #ifdef HAVE_TRACING
556   TRACE_smpi_computing_out(rank);
557   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
558 #endif
559 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
560                 recv, recv_size, MPI_CURRENT_TYPE2,
561                 root, MPI_COMM_WORLD);
562
563 #ifdef HAVE_TRACING
564   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
565   TRACE_smpi_computing_in(rank);
566 #endif
567
568   log_timed_action (action, clock);
569   xbt_free(send);
570   xbt_free(recv);
571 }
572
573
574 static void action_reducescatter(const char *const *action) {
575
576     /*
577  The structure of the reducescatter action for the rank 0 (total 4 processes) 
578  is the following:   
579 0 reduceScatter 275427 275427 275427 204020 11346849 0
580
581   where: 
582   1) The first four values after the name of the action declare the recvcounts array
583   2) The value 11346849 is the amount of instructions
584   3) The last value corresponds to the datatype, see decode_datatype().
585
586   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
587
588    */
589
590   double clock = smpi_process_simulated_elapsed();
591   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
592   int comp_size = parse_double(action[2+comm_size]);
593   int *recvcounts = xbt_new0(int, comm_size);  
594   int *disps = xbt_new0(int, comm_size);  
595   int i=0,recv_sum=0;
596   int root=0;
597   int rank = smpi_process_index();
598
599   if(action[3+comm_size])
600     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
601   else
602     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
603
604   for(i=0;i<comm_size;i++) {
605     recvcounts[i] = atoi(action[i+2]);
606     recv_sum=recv_sum+recvcounts[i];
607     disps[i] = 0;
608   }
609
610 #ifdef HAVE_TRACING
611   TRACE_smpi_computing_out(rank);
612   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, recv_sum*smpi_datatype_size(MPI_CURRENT_TYPE));
613 #endif
614    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
615        root, MPI_COMM_WORLD);
616    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
617                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
618    smpi_execute_flops(comp_size);
619
620
621 #ifdef HAVE_TRACING
622   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
623   TRACE_smpi_computing_in(rank);
624 #endif
625
626   log_timed_action (action, clock);
627 }
628
629
630 static void action_allgatherv(const char *const *action) {
631
632   /*
633  The structure of the allgatherv action for the rank 0 (total 4 processes) 
634  is the following:   
635 0 allGatherV 275427 275427 275427 275427 204020 0 275427 550854 826281 
636
637   where: 
638   1) 275427 is the sendcount
639   2) The next four elements declare the recvcounts array
640   3) The next four values declare the disps array
641   4) No more values mean that the datatype for sent and receive buffer
642   is the default one, see decode_datatype().
643
644    */
645
646   double clock = smpi_process_simulated_elapsed();
647
648   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
649   int i=0;
650   int sendcount=atoi(action[2]);
651   int *recvcounts = xbt_new0(int, comm_size);  
652   int *disps = xbt_new0(int, comm_size);  
653   int recv_sum=0;  
654   MPI_Datatype MPI_CURRENT_TYPE2;
655
656   if(action[3+2*comm_size]) {
657     MPI_CURRENT_TYPE = decode_datatype(action[3+2*comm_size]);
658     MPI_CURRENT_TYPE2 = decode_datatype(action[4+2*comm_size]);
659   } else {
660     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
661     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
662   }
663   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
664
665   for(i=0;i<comm_size;i++) {
666     recvcounts[i] = atoi(action[i+3]);
667     recv_sum=recv_sum+recvcounts[i];
668     disps[i] = atoi(action[i+3+comm_size]);
669   }
670   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
671
672 #ifdef HAVE_TRACING
673   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
674   TRACE_smpi_computing_out(rank);
675   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,sendcount*smpi_datatype_size(MPI_CURRENT_TYPE));
676 #endif
677
678 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
679
680 #ifdef HAVE_TRACING
681   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
682   TRACE_smpi_computing_in(rank);
683 #endif
684
685   log_timed_action (action, clock);
686   xbt_free(sendbuf);
687   xbt_free(recvbuf);
688   xbt_free(recvcounts);
689   xbt_free(disps);
690 }
691
692
693 static void action_allToAllv(const char *const *action) {
694   /*
695  The structure of the allToAllV action for the rank 0 (total 4 processes) 
696  is the following:   
697   0 allToAllV 100 1 7 10 12 5 10 20 45 100 1 70 10 5 1 5 77 90
698
699   where: 
700   1) 100 is the size of the send buffer *sizeof(int),
701   2) 1 7 10 12 is the sendcounts array
702   3) 5 10 20 45 is the sdispls array
703   4) 100*sizeof(int) is the size of the receiver buffer
704   5)  1 70 10 5 is the recvcounts array
705   6) 1 5 77 90 is the rdispls array
706
707    */
708
709
710   double clock = smpi_process_simulated_elapsed();
711
712   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
713   int send_buf_size=0,recv_buf_size=0,i=0;
714   int *sendcounts = xbt_new0(int, comm_size);  
715   int *recvcounts = xbt_new0(int, comm_size);  
716   int *senddisps = xbt_new0(int, comm_size);  
717   int *recvdisps = xbt_new0(int, comm_size);  
718
719   MPI_Datatype MPI_CURRENT_TYPE2;
720
721   send_buf_size=parse_double(action[2]);
722   recv_buf_size=parse_double(action[3+2*comm_size]);
723   if(action[4+4*comm_size]) {
724     MPI_CURRENT_TYPE=decode_datatype(action[4+4*comm_size]);    
725     MPI_CURRENT_TYPE2=decode_datatype(action[5+4*comm_size]);    
726   }
727   else {
728       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
729       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
730   }
731
732   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
733   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
734
735   for(i=0;i<comm_size;i++) {
736     sendcounts[i] = atoi(action[i+3]);
737     senddisps[i] = atoi(action[i+3+comm_size]);
738     recvcounts[i] = atoi(action[i+4+2*comm_size]);
739     recvdisps[i] = atoi(action[i+4+3*comm_size]);
740   }
741
742
743 #ifdef HAVE_TRACING
744   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
745   TRACE_smpi_computing_out(rank);
746   int count=0;
747   for(i=0;i<comm_size;i++) count+=sendcounts[i];
748   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,count*smpi_datatype_size(MPI_CURRENT_TYPE));
749 #endif
750     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps,      MPI_CURRENT_TYPE,
751                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
752                                MPI_COMM_WORLD);
753 #ifdef HAVE_TRACING
754   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
755   TRACE_smpi_computing_in(rank);
756 #endif
757
758   log_timed_action (action, clock);
759   xbt_free(sendbuf);
760   xbt_free(recvbuf);
761   xbt_free(sendcounts);
762   xbt_free(recvcounts);
763   xbt_free(senddisps);
764   xbt_free(recvdisps);
765 }
766
767 void smpi_replay_init(int *argc, char***argv){
768   PMPI_Init(argc, argv);
769   if (!smpi_process_index()){
770     _xbt_replay_action_init();
771     xbt_replay_action_register("init",       action_init);
772     xbt_replay_action_register("finalize",   action_finalize);
773     xbt_replay_action_register("comm_size",  action_comm_size);
774     xbt_replay_action_register("comm_split", action_comm_split);
775     xbt_replay_action_register("comm_dup",   action_comm_dup);
776     xbt_replay_action_register("send",       action_send);
777     xbt_replay_action_register("Isend",      action_Isend);
778     xbt_replay_action_register("recv",       action_recv);
779     xbt_replay_action_register("Irecv",      action_Irecv);
780     xbt_replay_action_register("wait",       action_wait);
781     xbt_replay_action_register("waitAll",    action_waitall);
782     xbt_replay_action_register("barrier",    action_barrier);
783     xbt_replay_action_register("bcast",      action_bcast);
784     xbt_replay_action_register("reduce",     action_reduce);
785     xbt_replay_action_register("allReduce",  action_allReduce);
786     xbt_replay_action_register("allToAll",   action_allToAll);
787     xbt_replay_action_register("allToAllV",  action_allToAllv);
788     xbt_replay_action_register("gather",  action_gather);
789     xbt_replay_action_register("allGatherV",  action_allgatherv);
790     xbt_replay_action_register("reduceScatter",  action_reducescatter);
791     xbt_replay_action_register("compute",    action_compute);
792   }
793
794   xbt_replay_action_runner(*argc, *argv);
795 }
796
797 int smpi_replay_finalize(){
798   double sim_time= 1.;
799   /* One active process will stop. Decrease the counter*/
800   active_processes--;
801   XBT_DEBUG("There are %lu elements in reqq[*]",
802             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
803   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
804   if(!active_processes){
805     /* Last process alive speaking */
806     /* end the simulated timer */
807     sim_time = smpi_process_simulated_elapsed();
808     XBT_INFO("Simulation time %g", sim_time);
809     _xbt_replay_action_exit();
810     xbt_free(reqq);
811     reqq = NULL;
812   }
813   smpi_mpi_barrier(MPI_COMM_WORLD);
814   return PMPI_Finalize();
815 }