Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009 - 2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 typedef struct {
31   xbt_dynar_t irecvs; /* of MPI_Request */
32 } s_smpi_replay_globals_t, *smpi_replay_globals_t;
33
34
35 /* Helper function */
36 static double parse_double(const char *string)
37 {
38   double value;
39   char *endptr;
40   value = strtod(string, &endptr);
41   if (*endptr != '\0')
42     THROWF(unknown_error, 0, "%s is not a double", string);
43   return value;
44 }
45
46 static MPI_Datatype decode_datatype(const char *const action)
47 {
48 // Declared datatypes,
49
50   switch(atoi(action))
51   {
52     case 0:
53       MPI_CURRENT_TYPE=MPI_DOUBLE;
54       break;
55     case 1:
56       MPI_CURRENT_TYPE=MPI_INT;
57       break;
58     case 2:
59       MPI_CURRENT_TYPE=MPI_CHAR;
60       break;
61     case 3:
62       MPI_CURRENT_TYPE=MPI_SHORT;
63       break;
64     case 4:
65       MPI_CURRENT_TYPE=MPI_LONG;
66       break;
67     case 5:
68       MPI_CURRENT_TYPE=MPI_FLOAT;
69       break;
70     case 6:
71       MPI_CURRENT_TYPE=MPI_BYTE;
72       break;
73     default:
74       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
75
76   }
77    return MPI_CURRENT_TYPE;
78 }
79
80 static void action_init(const char *const *action)
81 {
82   int i;
83   XBT_DEBUG("Initialize the counters");
84   smpi_replay_globals_t globals =  xbt_new(s_smpi_replay_globals_t, 1);
85   globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL);
86
87   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
88   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
89
90   smpi_process_set_user_data((void*) globals);
91
92   /* start a simulated timer */
93   smpi_process_simulated_start();
94   /*initialize the number of active processes */
95   active_processes = smpi_process_count();
96
97   if (!reqq) {
98     reqq=xbt_new0(xbt_dynar_t,active_processes);
99
100     for(i=0;i<active_processes;i++){
101       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),NULL);
102     }
103   }
104 }
105
106 static void action_finalize(const char *const *action)
107 {
108   smpi_replay_globals_t globals =
109       (smpi_replay_globals_t) smpi_process_get_user_data();
110   if (globals){
111     XBT_DEBUG("There are %lu irecvs in the dynar",
112          xbt_dynar_length(globals->irecvs));
113     xbt_dynar_free_container(&(globals->irecvs));
114   }
115   free(globals);
116 }
117
118 static void action_comm_size(const char *const *action)
119 {
120   double clock = smpi_process_simulated_elapsed();
121
122   communicator_size = parse_double(action[2]);
123   log_timed_action (action, clock);
124 }
125
126 static void action_comm_split(const char *const *action)
127 {
128   double clock = smpi_process_simulated_elapsed();
129
130   log_timed_action (action, clock);
131 }
132
133 static void action_comm_dup(const char *const *action)
134 {
135   double clock = smpi_process_simulated_elapsed();
136
137   log_timed_action (action, clock);
138 }
139
140 static void action_compute(const char *const *action)
141 {
142   double clock = smpi_process_simulated_elapsed();
143   smpi_execute_flops(parse_double(action[2]));
144
145   log_timed_action (action, clock);
146 }
147
148 static void action_send(const char *const *action)
149 {
150   int to = atoi(action[2]);
151   double size=parse_double(action[3]);
152   double clock = smpi_process_simulated_elapsed();
153
154   if(action[4]) {
155     MPI_CURRENT_TYPE=decode_datatype(action[4]);
156   } else {
157     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
158   }
159
160 #ifdef HAVE_TRACING
161   int rank = smpi_comm_rank(MPI_COMM_WORLD);
162   TRACE_smpi_computing_out(rank);
163   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
164   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
165   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
166 #endif
167
168   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
169
170   log_timed_action (action, clock);
171
172   #ifdef HAVE_TRACING
173   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
174   TRACE_smpi_computing_in(rank);
175 #endif
176
177 }
178
179 static void action_Isend(const char *const *action)
180 {
181   int to = atoi(action[2]);
182   double size=parse_double(action[3]);
183   double clock = smpi_process_simulated_elapsed();
184   MPI_Request request;
185
186   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
187   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
188
189 #ifdef HAVE_TRACING
190   int rank = smpi_comm_rank(MPI_COMM_WORLD);
191   TRACE_smpi_computing_out(rank);
192   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
193   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
194   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
195 #endif
196
197   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
198
199 #ifdef HAVE_TRACING
200   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
201   request->send = 1;
202   TRACE_smpi_computing_in(rank);
203 #endif
204
205   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_recv(const char *const *action) {
211   int from = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process_simulated_elapsed();
214   MPI_Status status;
215
216   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
218
219 #ifdef HAVE_TRACING
220   int rank = smpi_comm_rank(MPI_COMM_WORLD);
221   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
222   TRACE_smpi_computing_out(rank);
223
224   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
231   TRACE_smpi_recv(rank, src_traced, rank);
232   TRACE_smpi_computing_in(rank);
233 #endif
234
235   log_timed_action (action, clock);
236 }
237
238 static void action_Irecv(const char *const *action)
239 {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Request request;
244
245   smpi_replay_globals_t globals =
246      (smpi_replay_globals_t) smpi_process_get_user_data();
247
248   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250
251 #ifdef HAVE_TRACING
252   int rank = smpi_comm_rank(MPI_COMM_WORLD);
253   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
254   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, size*smpi_datatype_size(MPI_CURRENT_TYPE));
255 #endif
256
257   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
258
259 #ifdef HAVE_TRACING
260   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
261   request->recv = 1;
262 #endif
263   xbt_dynar_push(globals->irecvs,&request);
264   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
265
266   log_timed_action (action, clock);
267 }
268
269 static void action_wait(const char *const *action){
270   double clock = smpi_process_simulated_elapsed();
271   MPI_Request request;
272   MPI_Status status;
273   smpi_replay_globals_t globals =
274       (smpi_replay_globals_t) smpi_process_get_user_data();
275
276   xbt_assert(xbt_dynar_length(globals->irecvs),
277       "action wait not preceded by any irecv: %s",
278       xbt_str_join_array(action," "));
279   request = xbt_dynar_pop_as(globals->irecvs,MPI_Request);
280 #ifdef HAVE_TRACING
281   int rank = request && request->comm != MPI_COMM_NULL
282       ? smpi_comm_rank(request->comm)
283       : -1;
284   TRACE_smpi_computing_out(rank);
285
286   MPI_Group group = smpi_comm_group(request->comm);
287   int src_traced = smpi_group_rank(group, request->src);
288   int dst_traced = smpi_group_rank(group, request->dst);
289   int is_wait_for_receive = request->recv;
290   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, -1);
291 #endif
292   smpi_mpi_wait(&request, &status);
293 #ifdef HAVE_TRACING
294   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
295   if (is_wait_for_receive) {
296     TRACE_smpi_recv(rank, src_traced, dst_traced);
297   }
298   TRACE_smpi_computing_in(rank);
299 #endif
300
301   log_timed_action (action, clock);
302 }
303
304 static void action_waitall(const char *const *action){
305   double clock = smpi_process_simulated_elapsed();
306   int count_requests=0;
307   unsigned int i=0;
308
309   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
310
311   if (count_requests>0) {
312     MPI_Request requests[count_requests];
313     MPI_Status status[count_requests];
314
315     /*  The reqq is an array of dynars. Its index corresponds to the rank.
316      Thus each rank saves its own requests to the array request. */
317     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
318
319   #ifdef HAVE_TRACING
320    //save information from requests
321
322    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
323    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
324    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
325    for (i = 0; i < count_requests; i++) {
326     if(requests[i]){
327       int *asrc = xbt_new(int, 1);
328       int *adst = xbt_new(int, 1);
329       int *arecv = xbt_new(int, 1);
330       *asrc = requests[i]->src;
331       *adst = requests[i]->dst;
332       *arecv = requests[i]->recv;
333       xbt_dynar_insert_at(srcs, i, asrc);
334       xbt_dynar_insert_at(dsts, i, adst);
335       xbt_dynar_insert_at(recvs, i, arecv);
336       xbt_free(asrc);
337       xbt_free(adst);
338       xbt_free(arecv);
339     }else {
340       int *t = xbt_new(int, 1);
341       xbt_dynar_insert_at(srcs, i, t);
342       xbt_dynar_insert_at(dsts, i, t);
343       xbt_dynar_insert_at(recvs, i, t);
344       xbt_free(t);
345     }
346    }
347    int rank_traced = smpi_process_index();
348    TRACE_smpi_computing_out(rank_traced);
349
350    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__, count_requests);
351   #endif
352
353     smpi_mpi_waitall(count_requests, requests, status);
354
355   #ifdef HAVE_TRACING
356    for (i = 0; i < count_requests; i++) {
357     int src_traced, dst_traced, is_wait_for_receive;
358     xbt_dynar_get_cpy(srcs, i, &src_traced);
359     xbt_dynar_get_cpy(dsts, i, &dst_traced);
360     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
361     if (is_wait_for_receive) {
362       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
363     }
364    }
365    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
366    //clean-up of dynars
367    xbt_dynar_free(&srcs);
368    xbt_dynar_free(&dsts);
369    xbt_dynar_free(&recvs);
370    TRACE_smpi_computing_in(rank_traced);
371   #endif
372
373    xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
374   }
375   log_timed_action (action, clock);
376 }
377
378 static void action_barrier(const char *const *action){
379   double clock = smpi_process_simulated_elapsed();
380 #ifdef HAVE_TRACING
381   int rank = smpi_comm_rank(MPI_COMM_WORLD);
382   TRACE_smpi_computing_out(rank);
383   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, smpi_comm_size(MPI_COMM_WORLD));
384 #endif
385   smpi_mpi_barrier(MPI_COMM_WORLD);
386 #ifdef HAVE_TRACING
387   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
388   TRACE_smpi_computing_in(rank);
389 #endif
390
391   log_timed_action (action, clock);
392 }
393
394
395 static void action_bcast(const char *const *action)
396 {
397   double size = parse_double(action[2]);
398   double clock = smpi_process_simulated_elapsed();
399   int root=0;
400   /*
401    * Initialize MPI_CURRENT_TYPE in order to decrease
402    * the number of the checks
403    * */
404   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
405
406   if(action[3]) {
407     root= atoi(action[3]);
408     if(action[4]) {
409       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
410     }
411   }
412
413 #ifdef HAVE_TRACING
414   int rank = smpi_comm_rank(MPI_COMM_WORLD);
415   TRACE_smpi_computing_out(rank);
416   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
417   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,size*smpi_datatype_size(MPI_CURRENT_TYPE));
418 #endif
419
420   smpi_mpi_bcast(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
421 #ifdef HAVE_TRACING
422   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
423   TRACE_smpi_computing_in(rank);
424 #endif
425
426   log_timed_action (action, clock);
427 }
428
429 static void action_reduce(const char *const *action)
430 {
431   double comm_size = parse_double(action[2]);
432   double comp_size = parse_double(action[3]);
433   double clock = smpi_process_simulated_elapsed();
434   int root=0;
435   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
436
437   if(action[4]) {
438     root= atoi(action[4]);
439     if(action[5]) {
440       MPI_CURRENT_TYPE=decode_datatype(action[5]);
441     }
442   }
443
444 #ifdef HAVE_TRACING
445   int rank = smpi_comm_rank(MPI_COMM_WORLD);
446   TRACE_smpi_computing_out(rank);
447   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), 0);
448   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,comm_size*smpi_datatype_size(MPI_CURRENT_TYPE));
449 #endif
450    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
451    smpi_execute_flops(comp_size);
452 #ifdef HAVE_TRACING
453   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
454   TRACE_smpi_computing_in(rank);
455 #endif
456
457   log_timed_action (action, clock);
458 }
459
460 static void action_allReduce(const char *const *action) {
461   double comm_size = parse_double(action[2]);
462   double comp_size = parse_double(action[3]);
463
464   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
465   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
466
467   double clock = smpi_process_simulated_elapsed();
468 #ifdef HAVE_TRACING
469   int rank = smpi_comm_rank(MPI_COMM_WORLD);
470   TRACE_smpi_computing_out(rank);
471   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,comp_size*smpi_datatype_size(MPI_CURRENT_TYPE));
472 #endif
473   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
474   smpi_execute_flops(comp_size);
475   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
476 #ifdef HAVE_TRACING
477   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
478   TRACE_smpi_computing_in(rank);
479 #endif
480
481   log_timed_action (action, clock);
482 }
483
484 static void action_allToAll(const char *const *action) {
485   double clock = smpi_process_simulated_elapsed();
486   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
487   int send_size = parse_double(action[2]);
488   int recv_size = parse_double(action[3]);
489   MPI_Datatype MPI_CURRENT_TYPE2;
490
491   if(action[4]) {
492     MPI_CURRENT_TYPE=decode_datatype(action[4]);
493     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
494   }
495   else {
496     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
497     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
498   }
499   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
500   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
501
502 #ifdef HAVE_TRACING
503   int rank = smpi_process_index();
504   TRACE_smpi_computing_out(rank);
505   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
506 #endif
507
508   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
509
510 #ifdef HAVE_TRACING
511   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
512   TRACE_smpi_computing_in(rank);
513 #endif
514
515   log_timed_action (action, clock);
516   xbt_free(send);
517   xbt_free(recv);
518 }
519
520
521 static void action_gather(const char *const *action) {
522   /*
523  The structure of the gather action for the rank 0 (total 4 processes) 
524  is the following:   
525  0 gather 68 68 0 0 0
526
527   where: 
528   1) 68 is the sendcounts
529   2) 68 is the recvcounts
530   3) 0 is the root node
531   4) 0 is the send datatype id, see decode_datatype()
532   5) 0 is the recv datatype id, see decode_datatype()
533   */
534   double clock = smpi_process_simulated_elapsed();
535   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
536   int send_size = parse_double(action[2]);
537   int recv_size = parse_double(action[3]);
538   MPI_Datatype MPI_CURRENT_TYPE2;
539   if(action[5]) {
540     MPI_CURRENT_TYPE=decode_datatype(action[5]);
541     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
542   } else {
543     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
544     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
545   }
546   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
547   void *recv = calloc(recv_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
548
549   int root=atoi(action[4]);
550   int rank = smpi_process_index();
551
552   if(rank==root)
553     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
554
555 #ifdef HAVE_TRACING
556   TRACE_smpi_computing_out(rank);
557   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,send_size*smpi_datatype_size(MPI_CURRENT_TYPE));
558 #endif
559 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
560                 recv, recv_size, MPI_CURRENT_TYPE2,
561                 root, MPI_COMM_WORLD);
562
563 #ifdef HAVE_TRACING
564   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
565   TRACE_smpi_computing_in(rank);
566 #endif
567
568   log_timed_action (action, clock);
569   xbt_free(send);
570   xbt_free(recv);
571 }
572
573
574 static void action_reducescatter(const char *const *action) {
575
576     /*
577  The structure of the reducescatter action for the rank 0 (total 4 processes) 
578  is the following:   
579 0 reduceScatter 275427 275427 275427 204020 11346849 0
580
581   where: 
582   1) The first four values after the name of the action declare the recvcounts array
583   2) The value 11346849 is the amount of instructions
584   3) The last value corresponds to the datatype, see decode_datatype().
585
586   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
587
588    */
589
590   double clock = smpi_process_simulated_elapsed();
591   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
592   int comp_size = parse_double(action[2+comm_size]);
593   int *recvcounts = xbt_new0(int, comm_size);  
594   int *disps = xbt_new0(int, comm_size);  
595   int i=0,recv_sum=0;
596   int root=0;
597   int rank = smpi_process_index();
598
599   if(action[3+comm_size])
600     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
601   else
602     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
603
604   for(i=0;i<comm_size;i++) {
605     recvcounts[i] = atoi(action[i+2]);
606     recv_sum=recv_sum+recvcounts[i];
607     disps[i] = 0;
608   }
609
610 #ifdef HAVE_TRACING
611   TRACE_smpi_computing_out(rank);
612   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, recv_sum*smpi_datatype_size(MPI_CURRENT_TYPE));
613 #endif
614    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
615        root, MPI_COMM_WORLD);
616    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
617                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
618    smpi_execute_flops(comp_size);
619
620
621 #ifdef HAVE_TRACING
622   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
623   TRACE_smpi_computing_in(rank);
624 #endif
625
626   log_timed_action (action, clock);
627 }
628
629
630 static void action_allgatherv(const char *const *action) {
631
632   /*
633  The structure of the allgatherv action for the rank 0 (total 4 processes) 
634  is the following:   
635 0 allGatherV 275427 275427 275427 275427 204020
636
637   where: 
638   1) 275427 is the sendcount
639   2) The next four elements declare the recvcounts array
640   3) No more values mean that the datatype for sent and receive buffer
641   is the default one, see decode_datatype().
642
643    */
644
645   double clock = smpi_process_simulated_elapsed();
646
647   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
648   int i=0;
649   int sendcount=atoi(action[2]);
650   int *recvcounts = xbt_new0(int, comm_size);  
651   int *disps = xbt_new0(int, comm_size);  
652   int recv_sum=0;  
653   MPI_Datatype MPI_CURRENT_TYPE2;
654
655   if(action[3+comm_size]) {
656     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
657     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
658   } else {
659     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
660     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
661   }
662   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
663
664   for(i=0;i<comm_size;i++) {
665     recvcounts[i] = atoi(action[i+3]);
666     recv_sum=recv_sum+recvcounts[i];
667   }
668   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
669
670 #ifdef HAVE_TRACING
671   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
672   TRACE_smpi_computing_out(rank);
673   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,sendcount*smpi_datatype_size(MPI_CURRENT_TYPE));
674 #endif
675
676 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
677
678 #ifdef HAVE_TRACING
679   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
680   TRACE_smpi_computing_in(rank);
681 #endif
682
683   log_timed_action (action, clock);
684   xbt_free(sendbuf);
685   xbt_free(recvbuf);
686   xbt_free(recvcounts);
687   xbt_free(disps);
688 }
689
690
691 static void action_allToAllv(const char *const *action) {
692   /*
693  The structure of the allToAllV action for the rank 0 (total 4 processes) 
694  is the following:   
695   0 allToAllV 100 1 7 10 12 100 1 70 10 5
696
697   where: 
698   1) 100 is the size of the send buffer *sizeof(int),
699   2) 1 7 10 12 is the sendcounts array
700   3) 100*sizeof(int) is the size of the receiver buffer
701   4)  1 70 10 5 is the recvcounts array
702
703    */
704
705
706   double clock = smpi_process_simulated_elapsed();
707
708   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
709   int send_buf_size=0,recv_buf_size=0,i=0;
710   int *sendcounts = xbt_new0(int, comm_size);  
711   int *recvcounts = xbt_new0(int, comm_size);  
712   int *senddisps = xbt_new0(int, comm_size);  
713   int *recvdisps = xbt_new0(int, comm_size);  
714
715   MPI_Datatype MPI_CURRENT_TYPE2;
716
717   send_buf_size=parse_double(action[2]);
718   recv_buf_size=parse_double(action[3+comm_size]);
719   if(action[4+2*comm_size]) {
720     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
721     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
722   }
723   else {
724       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
725       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
726   }
727
728   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
729   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
730
731   for(i=0;i<comm_size;i++) {
732     sendcounts[i] = atoi(action[i+3]);
733     recvcounts[i] = atoi(action[i+4+comm_size]);
734   }
735
736
737 #ifdef HAVE_TRACING
738   int rank = MPI_COMM_WORLD != MPI_COMM_NULL ? smpi_process_index() : -1;
739   TRACE_smpi_computing_out(rank);
740   int count=0;
741   for(i=0;i<comm_size;i++) count+=sendcounts[i];
742   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,count*smpi_datatype_size(MPI_CURRENT_TYPE));
743 #endif
744     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
745                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
746                                MPI_COMM_WORLD);
747 #ifdef HAVE_TRACING
748   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
749   TRACE_smpi_computing_in(rank);
750 #endif
751
752   log_timed_action (action, clock);
753   xbt_free(sendbuf);
754   xbt_free(recvbuf);
755   xbt_free(sendcounts);
756   xbt_free(recvcounts);
757   xbt_free(senddisps);
758   xbt_free(recvdisps);
759 }
760
761 void smpi_replay_init(int *argc, char***argv){
762   PMPI_Init(argc, argv);
763   if (!smpi_process_index()){
764     _xbt_replay_action_init();
765     xbt_replay_action_register("init",       action_init);
766     xbt_replay_action_register("finalize",   action_finalize);
767     xbt_replay_action_register("comm_size",  action_comm_size);
768     xbt_replay_action_register("comm_split", action_comm_split);
769     xbt_replay_action_register("comm_dup",   action_comm_dup);
770     xbt_replay_action_register("send",       action_send);
771     xbt_replay_action_register("Isend",      action_Isend);
772     xbt_replay_action_register("recv",       action_recv);
773     xbt_replay_action_register("Irecv",      action_Irecv);
774     xbt_replay_action_register("wait",       action_wait);
775     xbt_replay_action_register("waitAll",    action_waitall);
776     xbt_replay_action_register("barrier",    action_barrier);
777     xbt_replay_action_register("bcast",      action_bcast);
778     xbt_replay_action_register("reduce",     action_reduce);
779     xbt_replay_action_register("allReduce",  action_allReduce);
780     xbt_replay_action_register("allToAll",   action_allToAll);
781     xbt_replay_action_register("allToAllV",  action_allToAllv);
782     xbt_replay_action_register("gather",  action_gather);
783     xbt_replay_action_register("allGatherV",  action_allgatherv);
784     xbt_replay_action_register("reduceScatter",  action_reducescatter);
785     xbt_replay_action_register("compute",    action_compute);
786   }
787
788   xbt_replay_action_runner(*argc, *argv);
789 }
790
791 int smpi_replay_finalize(){
792   double sim_time= 1.;
793   /* One active process will stop. Decrease the counter*/
794   active_processes--;
795   XBT_DEBUG("There are %lu elements in reqq[*]",
796             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
797   xbt_dynar_free(&reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
798   if(!active_processes){
799     /* Last process alive speaking */
800     /* end the simulated timer */
801     sim_time = smpi_process_simulated_elapsed();
802     XBT_INFO("Simulation time %g", sim_time);
803     _xbt_replay_action_exit();
804     xbt_free(reqq);
805     reqq = NULL;
806   }
807   smpi_mpi_barrier(MPI_COMM_WORLD);
808   return PMPI_Finalize();
809 }