Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
have replay use correct algorithm selection logic. Some of them did use only default...
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static void log_timed_action (const char *const *action, double clock){
22   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23     char *name = xbt_str_join_array(action, " ");
24     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
25     free(name);
26   }
27 }
28
29 /* Helper function */
30 static double parse_double(const char *string)
31 {
32   double value;
33   char *endptr;
34   value = strtod(string, &endptr);
35   if (*endptr != '\0')
36     THROWF(unknown_error, 0, "%s is not a double", string);
37   return value;
38 }
39
40 static MPI_Datatype decode_datatype(const char *const action)
41 {
42 // Declared datatypes,
43
44   switch(atoi(action))
45   {
46     case 0:
47       MPI_CURRENT_TYPE=MPI_DOUBLE;
48       break;
49     case 1:
50       MPI_CURRENT_TYPE=MPI_INT;
51       break;
52     case 2:
53       MPI_CURRENT_TYPE=MPI_CHAR;
54       break;
55     case 3:
56       MPI_CURRENT_TYPE=MPI_SHORT;
57       break;
58     case 4:
59       MPI_CURRENT_TYPE=MPI_LONG;
60       break;
61     case 5:
62       MPI_CURRENT_TYPE=MPI_FLOAT;
63       break;
64     case 6:
65       MPI_CURRENT_TYPE=MPI_BYTE;
66       break;
67     default:
68       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
69
70   }
71    return MPI_CURRENT_TYPE;
72 }
73
74
75 const char* encode_datatype(MPI_Datatype datatype)
76 {
77
78   //default type for output is set to MPI_BYTE
79   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80   if (datatype==MPI_BYTE){
81       return "";
82   }
83   if(datatype==MPI_DOUBLE)
84       return "0";
85   if(datatype==MPI_INT)
86       return "1";
87   if(datatype==MPI_CHAR)
88       return "2";
89   if(datatype==MPI_SHORT)
90       return "3";
91   if(datatype==MPI_LONG)
92     return "4";
93   if(datatype==MPI_FLOAT)
94       return "5";
95
96   // default - not implemented.
97   // do not warn here as we pass in this function even for other trace formats
98   return "-1";
99 }
100
101 static void action_init(const char *const *action)
102 {
103   int i;
104   XBT_DEBUG("Initialize the counters");
105
106   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
107   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
108
109   /* start a simulated timer */
110   smpi_process_simulated_start();
111   /*initialize the number of active processes */
112   active_processes = smpi_process_count();
113
114   if (!reqq) {
115     reqq=xbt_new0(xbt_dynar_t,active_processes);
116
117     for(i=0;i<active_processes;i++){
118       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
119     }
120   }
121 }
122
123 static void action_finalize(const char *const *action)
124 {
125 }
126
127 static void action_comm_size(const char *const *action)
128 {
129   double clock = smpi_process_simulated_elapsed();
130
131   communicator_size = parse_double(action[2]);
132   log_timed_action (action, clock);
133 }
134
135 static void action_comm_split(const char *const *action)
136 {
137   double clock = smpi_process_simulated_elapsed();
138
139   log_timed_action (action, clock);
140 }
141
142 static void action_comm_dup(const char *const *action)
143 {
144   double clock = smpi_process_simulated_elapsed();
145
146   log_timed_action (action, clock);
147 }
148
149 static void action_compute(const char *const *action)
150 {
151   double clock = smpi_process_simulated_elapsed();
152   double flops= parse_double(action[2]);
153 #ifdef HAVE_TRACING
154   int rank = smpi_comm_rank(MPI_COMM_WORLD);
155   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156   extra->type=TRACING_COMPUTING;
157   extra->comp_size=flops;
158   TRACE_smpi_computing_in(rank, extra);
159 #endif
160   smpi_execute_flops(flops);
161 #ifdef HAVE_TRACING
162   TRACE_smpi_computing_out(rank);
163 #endif
164
165   log_timed_action (action, clock);
166 }
167
168 static void action_send(const char *const *action)
169 {
170   int to = atoi(action[2]);
171   double size=parse_double(action[3]);
172   double clock = smpi_process_simulated_elapsed();
173
174   if(action[4]) {
175     MPI_CURRENT_TYPE=decode_datatype(action[4]);
176   } else {
177     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
178   }
179
180 #ifdef HAVE_TRACING
181   int rank = smpi_comm_rank(MPI_COMM_WORLD);
182
183   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185   extra->type = TRACING_SEND;
186   extra->send_size = size;
187   extra->src = rank;
188   extra->dst = dst_traced;
189   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
192 #endif
193
194   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195
196   log_timed_action (action, clock);
197
198   #ifdef HAVE_TRACING
199   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
200 #endif
201
202 }
203
204 static void action_Isend(const char *const *action)
205 {
206   int to = atoi(action[2]);
207   double size=parse_double(action[3]);
208   double clock = smpi_process_simulated_elapsed();
209   MPI_Request request;
210
211   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
213
214 #ifdef HAVE_TRACING
215   int rank = smpi_comm_rank(MPI_COMM_WORLD);
216   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218   extra->type = TRACING_ISEND;
219   extra->send_size = size;
220   extra->src = rank;
221   extra->dst = dst_traced;
222   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231   request->send = 1;
232 #endif
233
234   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
235
236   log_timed_action (action, clock);
237 }
238
239 static void action_recv(const char *const *action) {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Status status;
244
245   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
247
248 #ifdef HAVE_TRACING
249   int rank = smpi_comm_rank(MPI_COMM_WORLD);
250   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
251
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_RECV;
254   extra->send_size = size;
255   extra->src = src_traced;
256   extra->dst = rank;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
259 #endif
260
261   //unknow size from the receiver pov
262   if(size==-1){
263       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
264       size=status.count;
265   }
266
267   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
268
269 #ifdef HAVE_TRACING
270   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271   TRACE_smpi_recv(rank, src_traced, rank);
272 #endif
273
274   log_timed_action (action, clock);
275 }
276
277 static void action_Irecv(const char *const *action)
278 {
279   int from = atoi(action[2]);
280   double size=parse_double(action[3]);
281   double clock = smpi_process_simulated_elapsed();
282   MPI_Request request;
283
284   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286
287 #ifdef HAVE_TRACING
288   int rank = smpi_comm_rank(MPI_COMM_WORLD);
289   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291   extra->type = TRACING_IRECV;
292   extra->send_size = size;
293   extra->src = src_traced;
294   extra->dst = rank;
295   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 #endif
298   MPI_Status status;
299   //unknow size from the receiver pov
300   if(size==-1){
301       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
302       size=status.count;
303   }
304
305   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
306
307 #ifdef HAVE_TRACING
308   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
309   request->recv = 1;
310 #endif
311   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_test(const char *const *action){
317   double clock = smpi_process_simulated_elapsed();
318   MPI_Request request;
319   MPI_Status status;
320   int flag = TRUE;
321
322   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
323   xbt_assert(request != NULL, "found null request in reqq");
324
325 #ifdef HAVE_TRACING
326   int rank = smpi_comm_rank(MPI_COMM_WORLD);
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type=TRACING_TEST;
329   TRACE_smpi_testing_in(rank, extra);
330 #endif
331   flag = smpi_mpi_test(&request, &status);
332   XBT_DEBUG("MPI_Test result: %d", flag);
333   /* push back request in dynar to be caught by a subsequent wait. if the test
334    * did succeed, the request is now NULL.
335    */
336   xbt_dynar_push_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request, request);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_testing_out(rank);
340 #endif
341
342   log_timed_action (action, clock);
343 }
344
345 static void action_wait(const char *const *action){
346   double clock = smpi_process_simulated_elapsed();
347   MPI_Request request;
348   MPI_Status status;
349
350   xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
351       "action wait not preceded by any irecv or isend: %s",
352       xbt_str_join_array(action," "));
353   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
354
355   if (!request){
356     /* Assuming that the trace is well formed, this mean the comm might have
357      * been caught by a MPI_test. Then just return.
358      */
359     return;
360   }
361
362 #ifdef HAVE_TRACING
363   int rank = request->comm != MPI_COMM_NULL
364       ? smpi_comm_rank(request->comm)
365       : -1;
366
367   MPI_Group group = smpi_comm_group(request->comm);
368   int src_traced = smpi_group_rank(group, request->src);
369   int dst_traced = smpi_group_rank(group, request->dst);
370   int is_wait_for_receive = request->recv;
371   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
372   extra->type = TRACING_WAIT;
373   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
374 #endif
375   smpi_mpi_wait(&request, &status);
376 #ifdef HAVE_TRACING
377   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
378   if (is_wait_for_receive) {
379     TRACE_smpi_recv(rank, src_traced, dst_traced);
380   }
381 #endif
382
383   log_timed_action (action, clock);
384 }
385
386 static void action_waitall(const char *const *action){
387   double clock = smpi_process_simulated_elapsed();
388   int count_requests=0;
389   unsigned int i=0;
390
391   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
392
393   if (count_requests>0) {
394     MPI_Request requests[count_requests];
395     MPI_Status status[count_requests];
396
397     /*  The reqq is an array of dynars. Its index corresponds to the rank.
398      Thus each rank saves its own requests to the array request. */
399     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
400
401   #ifdef HAVE_TRACING
402    //save information from requests
403
404    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
405    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
406    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
407    for (i = 0; i < count_requests; i++) {
408     if(requests[i]){
409       int *asrc = xbt_new(int, 1);
410       int *adst = xbt_new(int, 1);
411       int *arecv = xbt_new(int, 1);
412       *asrc = requests[i]->src;
413       *adst = requests[i]->dst;
414       *arecv = requests[i]->recv;
415       xbt_dynar_insert_at(srcs, i, asrc);
416       xbt_dynar_insert_at(dsts, i, adst);
417       xbt_dynar_insert_at(recvs, i, arecv);
418       xbt_free(asrc);
419       xbt_free(adst);
420       xbt_free(arecv);
421     }else {
422       int *t = xbt_new(int, 1);
423       xbt_dynar_insert_at(srcs, i, t);
424       xbt_dynar_insert_at(dsts, i, t);
425       xbt_dynar_insert_at(recvs, i, t);
426       xbt_free(t);
427     }
428    }
429    int rank_traced = smpi_process_index();
430    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431    extra->type = TRACING_WAITALL;
432    extra->send_size=count_requests;
433    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
434  #endif
435
436     smpi_mpi_waitall(count_requests, requests, status);
437
438   #ifdef HAVE_TRACING
439    for (i = 0; i < count_requests; i++) {
440     int src_traced, dst_traced, is_wait_for_receive;
441     xbt_dynar_get_cpy(srcs, i, &src_traced);
442     xbt_dynar_get_cpy(dsts, i, &dst_traced);
443     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
444     if (is_wait_for_receive) {
445       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
446     }
447    }
448    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
449    //clean-up of dynars
450    xbt_dynar_free(&srcs);
451    xbt_dynar_free(&dsts);
452    xbt_dynar_free(&recvs);
453   #endif
454
455    xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
456   }
457   log_timed_action (action, clock);
458 }
459
460 static void action_barrier(const char *const *action){
461   double clock = smpi_process_simulated_elapsed();
462 #ifdef HAVE_TRACING
463   int rank = smpi_comm_rank(MPI_COMM_WORLD);
464   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465   extra->type = TRACING_BARRIER;
466   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
467 #endif
468   mpi_coll_barrier_fun(MPI_COMM_WORLD);
469 #ifdef HAVE_TRACING
470   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
471 #endif
472
473   log_timed_action (action, clock);
474 }
475
476
477 static void action_bcast(const char *const *action)
478 {
479   double size = parse_double(action[2]);
480   double clock = smpi_process_simulated_elapsed();
481   int root=0;
482   /*
483    * Initialize MPI_CURRENT_TYPE in order to decrease
484    * the number of the checks
485    * */
486   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
487
488   if(action[3]) {
489     root= atoi(action[3]);
490     if(action[4]) {
491       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
492     }
493   }
494
495 #ifdef HAVE_TRACING
496   int rank = smpi_comm_rank(MPI_COMM_WORLD);
497   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
498
499   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500   extra->type = TRACING_BCAST;
501   extra->send_size = size;
502   extra->root = root_traced;
503   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
505
506 #endif
507
508   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
509 #ifdef HAVE_TRACING
510   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
511 #endif
512
513   log_timed_action (action, clock);
514 }
515
516 static void action_reduce(const char *const *action)
517 {
518   double comm_size = parse_double(action[2]);
519   double comp_size = parse_double(action[3]);
520   double clock = smpi_process_simulated_elapsed();
521   int root=0;
522   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523
524   if(action[4]) {
525     root= atoi(action[4]);
526     if(action[5]) {
527       MPI_CURRENT_TYPE=decode_datatype(action[5]);
528     }
529   }
530
531 #ifdef HAVE_TRACING
532   int rank = smpi_comm_rank(MPI_COMM_WORLD);
533   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
534   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
535   extra->type = TRACING_REDUCE;
536   extra->send_size = comm_size;
537   extra->comp_size = comp_size;
538   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
539   extra->root = root_traced;
540
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
542 #endif
543    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
544    smpi_execute_flops(comp_size);
545 #ifdef HAVE_TRACING
546   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
547 #endif
548
549   log_timed_action (action, clock);
550 }
551
552 static void action_allReduce(const char *const *action) {
553   double comm_size = parse_double(action[2]);
554   double comp_size = parse_double(action[3]);
555
556   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
557   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
558
559   double clock = smpi_process_simulated_elapsed();
560 #ifdef HAVE_TRACING
561   int rank = smpi_comm_rank(MPI_COMM_WORLD);
562   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
563   extra->type = TRACING_ALLREDUCE;
564   extra->send_size = comm_size;
565   extra->comp_size = comp_size;
566   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
567
568   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
569 #endif
570   mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
571   smpi_execute_flops(comp_size);
572 #ifdef HAVE_TRACING
573   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
574 #endif
575
576   log_timed_action (action, clock);
577 }
578
579 static void action_allToAll(const char *const *action) {
580   double clock = smpi_process_simulated_elapsed();
581   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
582   int send_size = parse_double(action[2]);
583   int recv_size = parse_double(action[3]);
584   MPI_Datatype MPI_CURRENT_TYPE2;
585
586   if(action[4]) {
587     MPI_CURRENT_TYPE=decode_datatype(action[4]);
588     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
589   }
590   else {
591     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
592     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
593   }
594   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
595   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
596
597 #ifdef HAVE_TRACING
598   int rank = smpi_process_index();
599   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
600   extra->type = TRACING_ALLTOALL;
601   extra->send_size = send_size;
602   extra->recv_size = recv_size;
603   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
604   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
605
606   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
607 #endif
608
609   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
610
611 #ifdef HAVE_TRACING
612   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
613 #endif
614
615   log_timed_action (action, clock);
616   xbt_free(send);
617   xbt_free(recv);
618 }
619
620
621 static void action_gather(const char *const *action) {
622   /*
623  The structure of the gather action for the rank 0 (total 4 processes) 
624  is the following:   
625  0 gather 68 68 0 0 0
626
627   where: 
628   1) 68 is the sendcounts
629   2) 68 is the recvcounts
630   3) 0 is the root node
631   4) 0 is the send datatype id, see decode_datatype()
632   5) 0 is the recv datatype id, see decode_datatype()
633   */
634   double clock = smpi_process_simulated_elapsed();
635   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
636   int send_size = parse_double(action[2]);
637   int recv_size = parse_double(action[3]);
638   MPI_Datatype MPI_CURRENT_TYPE2;
639   if(action[5]) {
640     MPI_CURRENT_TYPE=decode_datatype(action[5]);
641     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
642   } else {
643     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
644     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
645   }
646   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
647   void *recv = NULL;
648
649   int root=atoi(action[4]);
650   int rank = smpi_process_index();
651
652   if(rank==root)
653     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
654
655 #ifdef HAVE_TRACING
656   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
657   extra->type = TRACING_GATHER;
658   extra->send_size = send_size;
659   extra->recv_size = recv_size;
660   extra->root = root;
661   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
662   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
663
664   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
665 #endif
666   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
667                 recv, recv_size, MPI_CURRENT_TYPE2,
668                 root, MPI_COMM_WORLD);
669
670 #ifdef HAVE_TRACING
671   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
672 #endif
673
674   log_timed_action (action, clock);
675   xbt_free(send);
676   xbt_free(recv);
677 }
678
679
680
681 static void action_gatherv(const char *const *action) {
682   /*
683  The structure of the gatherv action for the rank 0 (total 4 processes)
684  is the following:
685  0 gather 68 68 10 10 10 0 0 0
686
687   where:
688   1) 68 is the sendcount
689   2) 68 10 10 10 is the recvcounts
690   3) 0 is the root node
691   4) 0 is the send datatype id, see decode_datatype()
692   5) 0 is the recv datatype id, see decode_datatype()
693   */
694   double clock = smpi_process_simulated_elapsed();
695   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
696   int send_size = parse_double(action[2]);
697   int *disps = xbt_new0(int, comm_size);
698   int *recvcounts = xbt_new0(int, comm_size);
699   int i=0,recv_sum=0;
700
701   MPI_Datatype MPI_CURRENT_TYPE2;
702   if(action[4+comm_size]) {
703     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
704     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
705   } else {
706     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
707     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
708   }
709   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
710   void *recv = NULL;
711   for(i=0;i<comm_size;i++) {
712     recvcounts[i] = atoi(action[i+3]);
713     recv_sum=recv_sum+recvcounts[i];
714     disps[i] = 0;
715   }
716
717   int root=atoi(action[3+comm_size]);
718   int rank = smpi_process_index();
719
720   if(rank==root)
721     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
722
723 #ifdef HAVE_TRACING
724   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725   extra->type = TRACING_GATHERV;
726   extra->send_size = send_size;
727   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
728   for(i=0; i< comm_size; i++)//copy data to avoid bad free
729     extra->recvcounts[i] = recvcounts[i];
730   extra->root = root;
731   extra->num_processes = comm_size;
732   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
733   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
734
735   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
736 #endif
737 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
738                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
739                 root, MPI_COMM_WORLD);
740
741 #ifdef HAVE_TRACING
742   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
743 #endif
744
745   log_timed_action (action, clock);
746   xbt_free(recvcounts);
747   xbt_free(send);
748   xbt_free(recv);
749   xbt_free(disps);
750
751 }
752
753 static void action_reducescatter(const char *const *action) {
754
755     /*
756  The structure of the reducescatter action for the rank 0 (total 4 processes) 
757  is the following:   
758 0 reduceScatter 275427 275427 275427 204020 11346849 0
759
760   where: 
761   1) The first four values after the name of the action declare the recvcounts array
762   2) The value 11346849 is the amount of instructions
763   3) The last value corresponds to the datatype, see decode_datatype().
764
765   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
766
767    */
768
769   double clock = smpi_process_simulated_elapsed();
770   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
771   int comp_size = parse_double(action[2+comm_size]);
772   int *recvcounts = xbt_new0(int, comm_size);  
773   int *disps = xbt_new0(int, comm_size);  
774   int i=0,recv_sum=0;
775   int root=0;
776   int rank = smpi_process_index();
777
778   if(action[3+comm_size])
779     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
780   else
781     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
782
783   for(i=0;i<comm_size;i++) {
784     recvcounts[i] = atoi(action[i+2]);
785     recv_sum=recv_sum+recvcounts[i];
786     disps[i] = 0;
787   }
788
789 #ifdef HAVE_TRACING
790   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
791   extra->type = TRACING_REDUCE_SCATTER;
792   extra->send_size = 0;
793   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
794   for(i=0; i< comm_size; i++)//copy data to avoid bad free
795     extra->recvcounts[i] = recvcounts[i];
796   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
797   extra->comp_size = comp_size;
798   extra->num_processes = comm_size;
799
800
801   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
802 #endif
803    mpi_coll_reduce_scatter_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
804        MPI_COMM_WORLD);
805    smpi_execute_flops(comp_size);
806
807
808 #ifdef HAVE_TRACING
809   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
810 #endif
811   xbt_free(recvcounts);
812   xbt_free(disps);
813   log_timed_action (action, clock);
814 }
815
816
817 static void action_allgatherv(const char *const *action) {
818
819   /*
820  The structure of the allgatherv action for the rank 0 (total 4 processes) 
821  is the following:   
822 0 allGatherV 275427 275427 275427 275427 204020
823
824   where: 
825   1) 275427 is the sendcount
826   2) The next four elements declare the recvcounts array
827   3) No more values mean that the datatype for sent and receive buffer
828   is the default one, see decode_datatype().
829
830    */
831
832   double clock = smpi_process_simulated_elapsed();
833
834   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
835   int i=0;
836   int sendcount=atoi(action[2]);
837   int *recvcounts = xbt_new0(int, comm_size);  
838   int *disps = xbt_new0(int, comm_size);  
839   int recv_sum=0;  
840   MPI_Datatype MPI_CURRENT_TYPE2;
841
842   if(action[3+comm_size]) {
843     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
844     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
845   } else {
846     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
847     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
848   }
849   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
850
851   for(i=0;i<comm_size;i++) {
852     recvcounts[i] = atoi(action[i+3]);
853     recv_sum=recv_sum+recvcounts[i];
854   }
855   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
856
857 #ifdef HAVE_TRACING
858   int rank = smpi_process_index();
859   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
860   extra->type = TRACING_ALLGATHERV;
861   extra->send_size = sendcount;
862   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
863   for(i=0; i< comm_size; i++)//copy data to avoid bad free
864     extra->recvcounts[i] = recvcounts[i];
865   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
866   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
867   extra->num_processes = comm_size;
868
869   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
870 #endif
871
872   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
873
874 #ifdef HAVE_TRACING
875   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
876 #endif
877
878   log_timed_action (action, clock);
879   xbt_free(sendbuf);
880   xbt_free(recvbuf);
881   xbt_free(recvcounts);
882   xbt_free(disps);
883 }
884
885
886 static void action_allToAllv(const char *const *action) {
887   /*
888  The structure of the allToAllV action for the rank 0 (total 4 processes) 
889  is the following:   
890   0 allToAllV 100 1 7 10 12 100 1 70 10 5
891
892   where: 
893   1) 100 is the size of the send buffer *sizeof(int),
894   2) 1 7 10 12 is the sendcounts array
895   3) 100*sizeof(int) is the size of the receiver buffer
896   4)  1 70 10 5 is the recvcounts array
897
898    */
899
900
901   double clock = smpi_process_simulated_elapsed();
902
903   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
904   int send_buf_size=0,recv_buf_size=0,i=0;
905   int *sendcounts = xbt_new0(int, comm_size);  
906   int *recvcounts = xbt_new0(int, comm_size);  
907   int *senddisps = xbt_new0(int, comm_size);  
908   int *recvdisps = xbt_new0(int, comm_size);  
909
910   MPI_Datatype MPI_CURRENT_TYPE2;
911
912   send_buf_size=parse_double(action[2]);
913   recv_buf_size=parse_double(action[3+comm_size]);
914   if(action[4+2*comm_size]) {
915     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
916     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
917   }
918   else {
919       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
920       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
921   }
922
923   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
924   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
925
926   for(i=0;i<comm_size;i++) {
927     sendcounts[i] = atoi(action[i+3]);
928     recvcounts[i] = atoi(action[i+4+comm_size]);
929   }
930
931
932 #ifdef HAVE_TRACING
933   int rank = smpi_process_index();
934   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
935   extra->type = TRACING_ALLTOALLV;
936   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
937   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
938   extra->num_processes = comm_size;
939
940   for(i=0; i< comm_size; i++){//copy data to avoid bad free
941     extra->send_size += sendcounts[i];
942     extra->sendcounts[i] = sendcounts[i];
943     extra->recv_size += recvcounts[i];
944     extra->recvcounts[i] = recvcounts[i];
945   }
946   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
947   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
948
949   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
950 #endif
951   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
952                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
953                                MPI_COMM_WORLD);
954 #ifdef HAVE_TRACING
955   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
956 #endif
957
958   log_timed_action (action, clock);
959   xbt_free(sendbuf);
960   xbt_free(recvbuf);
961   xbt_free(sendcounts);
962   xbt_free(recvcounts);
963   xbt_free(senddisps);
964   xbt_free(recvdisps);
965 }
966
967 void smpi_replay_init(int *argc, char***argv){
968   smpi_process_init(argc, argv);
969   smpi_process_mark_as_initialized();
970 #ifdef HAVE_TRACING
971   int rank = smpi_process_index();
972   TRACE_smpi_init(rank);
973   TRACE_smpi_computing_init(rank);
974   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
975   extra->type = TRACING_INIT;
976   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
977   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
978 #endif
979
980   if (!smpi_process_index()){
981     _xbt_replay_action_init();
982     xbt_replay_action_register("init",       action_init);
983     xbt_replay_action_register("finalize",   action_finalize);
984     xbt_replay_action_register("comm_size",  action_comm_size);
985     xbt_replay_action_register("comm_split", action_comm_split);
986     xbt_replay_action_register("comm_dup",   action_comm_dup);
987     xbt_replay_action_register("send",       action_send);
988     xbt_replay_action_register("Isend",      action_Isend);
989     xbt_replay_action_register("recv",       action_recv);
990     xbt_replay_action_register("Irecv",      action_Irecv);
991     xbt_replay_action_register("test",       action_test);
992     xbt_replay_action_register("wait",       action_wait);
993     xbt_replay_action_register("waitAll",    action_waitall);
994     xbt_replay_action_register("barrier",    action_barrier);
995     xbt_replay_action_register("bcast",      action_bcast);
996     xbt_replay_action_register("reduce",     action_reduce);
997     xbt_replay_action_register("allReduce",  action_allReduce);
998     xbt_replay_action_register("allToAll",   action_allToAll);
999     xbt_replay_action_register("allToAllV",  action_allToAllv);
1000     xbt_replay_action_register("gather",  action_gather);
1001     xbt_replay_action_register("gatherV",  action_gatherv);
1002     xbt_replay_action_register("allGatherV",  action_allgatherv);
1003     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1004     xbt_replay_action_register("compute",    action_compute);
1005   }
1006
1007   xbt_replay_action_runner(*argc, *argv);
1008 }
1009
1010 int smpi_replay_finalize(){
1011   double sim_time= 1.;
1012   /* One active process will stop. Decrease the counter*/
1013   XBT_DEBUG("There are %lu elements in reqq[*]",
1014             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1015   if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
1016     int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
1017     MPI_Request requests[count_requests];
1018     MPI_Status status[count_requests];
1019     unsigned int i;
1020
1021     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
1022     smpi_mpi_waitall(count_requests, requests, status);
1023     active_processes--;
1024   } else {
1025     active_processes--;
1026   }
1027
1028   xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1029
1030   if(!active_processes){
1031     /* Last process alive speaking */
1032     /* end the simulated timer */
1033     sim_time = smpi_process_simulated_elapsed();
1034     XBT_INFO("Simulation time %f", sim_time);
1035     _xbt_replay_action_exit();
1036     xbt_free(reqq);
1037     reqq = NULL;
1038   }
1039   smpi_mpi_barrier(MPI_COMM_WORLD);
1040 #ifdef HAVE_TRACING
1041   int rank = smpi_process_index();
1042   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1043   extra->type = TRACING_FINALIZE;
1044   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1045 #endif
1046   smpi_process_finalize();
1047 #ifdef HAVE_TRACING
1048   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1049   TRACE_smpi_finalize(smpi_process_index());
1050 #endif
1051   smpi_process_destroy();
1052   return MPI_SUCCESS;
1053 }