Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
11f9d1846bd3bc6cb2cc4a296f098a4aa28e7ed8
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static void log_timed_action (const char *const *action, double clock){
22   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23     char *name = xbt_str_join_array(action, " ");
24     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
25     free(name);
26   }
27 }
28
29 /* Helper function */
30 static double parse_double(const char *string)
31 {
32   double value;
33   char *endptr;
34   value = strtod(string, &endptr);
35   if (*endptr != '\0')
36     THROWF(unknown_error, 0, "%s is not a double", string);
37   return value;
38 }
39
40 static MPI_Datatype decode_datatype(const char *const action)
41 {
42 // Declared datatypes,
43
44   switch(atoi(action))
45   {
46     case 0:
47       MPI_CURRENT_TYPE=MPI_DOUBLE;
48       break;
49     case 1:
50       MPI_CURRENT_TYPE=MPI_INT;
51       break;
52     case 2:
53       MPI_CURRENT_TYPE=MPI_CHAR;
54       break;
55     case 3:
56       MPI_CURRENT_TYPE=MPI_SHORT;
57       break;
58     case 4:
59       MPI_CURRENT_TYPE=MPI_LONG;
60       break;
61     case 5:
62       MPI_CURRENT_TYPE=MPI_FLOAT;
63       break;
64     case 6:
65       MPI_CURRENT_TYPE=MPI_BYTE;
66       break;
67     default:
68       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
69
70   }
71    return MPI_CURRENT_TYPE;
72 }
73
74
75 const char* encode_datatype(MPI_Datatype datatype)
76 {
77
78   //default type for output is set to MPI_BYTE
79   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80   if (datatype==MPI_BYTE){
81       return "";
82   }
83   if(datatype==MPI_DOUBLE)
84       return "0";
85   if(datatype==MPI_INT)
86       return "1";
87   if(datatype==MPI_CHAR)
88       return "2";
89   if(datatype==MPI_SHORT)
90       return "3";
91   if(datatype==MPI_LONG)
92     return "4";
93   if(datatype==MPI_FLOAT)
94       return "5";
95
96   // default - not implemented.
97   // do not warn here as we pass in this function even for other trace formats
98   return "-1";
99 }
100
101 static void action_init(const char *const *action)
102 {
103   int i;
104   XBT_DEBUG("Initialize the counters");
105
106   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
107   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
108
109   /* start a simulated timer */
110   smpi_process_simulated_start();
111   /*initialize the number of active processes */
112   active_processes = smpi_process_count();
113
114   if (!reqq) {
115     reqq=xbt_new0(xbt_dynar_t,active_processes);
116
117     for(i=0;i<active_processes;i++){
118       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
119     }
120   }
121 }
122
123 static void action_finalize(const char *const *action)
124 {
125 }
126
127 static void action_comm_size(const char *const *action)
128 {
129   double clock = smpi_process_simulated_elapsed();
130
131   communicator_size = parse_double(action[2]);
132   log_timed_action (action, clock);
133 }
134
135 static void action_comm_split(const char *const *action)
136 {
137   double clock = smpi_process_simulated_elapsed();
138
139   log_timed_action (action, clock);
140 }
141
142 static void action_comm_dup(const char *const *action)
143 {
144   double clock = smpi_process_simulated_elapsed();
145
146   log_timed_action (action, clock);
147 }
148
149 static void action_compute(const char *const *action)
150 {
151   double clock = smpi_process_simulated_elapsed();
152   double flops= parse_double(action[2]);
153 #ifdef HAVE_TRACING
154   int rank = smpi_process_index();
155   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156   extra->type=TRACING_COMPUTING;
157   extra->comp_size=flops;
158   TRACE_smpi_computing_in(rank, extra);
159 #endif
160   smpi_execute_flops(flops);
161 #ifdef HAVE_TRACING
162   TRACE_smpi_computing_out(rank);
163 #endif
164
165   log_timed_action (action, clock);
166 }
167
168 static void action_send(const char *const *action)
169 {
170   int to = atoi(action[2]);
171   double size=parse_double(action[3]);
172   double clock = smpi_process_simulated_elapsed();
173
174   if(action[4]) {
175     MPI_CURRENT_TYPE=decode_datatype(action[4]);
176   } else {
177     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
178   }
179
180 #ifdef HAVE_TRACING
181   int rank = smpi_process_index();
182
183   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185   extra->type = TRACING_SEND;
186   extra->send_size = size;
187   extra->src = rank;
188   extra->dst = dst_traced;
189   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
192 #endif
193
194   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195
196   log_timed_action (action, clock);
197
198   #ifdef HAVE_TRACING
199   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
200 #endif
201
202 }
203
204 static void action_Isend(const char *const *action)
205 {
206   int to = atoi(action[2]);
207   double size=parse_double(action[3]);
208   double clock = smpi_process_simulated_elapsed();
209   MPI_Request request;
210
211   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
213
214 #ifdef HAVE_TRACING
215   int rank = smpi_process_index();
216   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218   extra->type = TRACING_ISEND;
219   extra->send_size = size;
220   extra->src = rank;
221   extra->dst = dst_traced;
222   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231   request->send = 1;
232 #endif
233
234   xbt_dynar_push(reqq[smpi_process_index()],&request);
235
236   log_timed_action (action, clock);
237 }
238
239 static void action_recv(const char *const *action) {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Status status;
244
245   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
247
248 #ifdef HAVE_TRACING
249   int rank = smpi_process_index();
250   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
251
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_RECV;
254   extra->send_size = size;
255   extra->src = src_traced;
256   extra->dst = rank;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
259 #endif
260
261   //unknow size from the receiver pov
262   if(size==-1){
263       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
264       size=status.count;
265   }
266
267   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
268
269 #ifdef HAVE_TRACING
270   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271   TRACE_smpi_recv(rank, src_traced, rank);
272 #endif
273
274   log_timed_action (action, clock);
275 }
276
277 static void action_Irecv(const char *const *action)
278 {
279   int from = atoi(action[2]);
280   double size=parse_double(action[3]);
281   double clock = smpi_process_simulated_elapsed();
282   MPI_Request request;
283
284   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286
287 #ifdef HAVE_TRACING
288   int rank = smpi_process_index();
289   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291   extra->type = TRACING_IRECV;
292   extra->send_size = size;
293   extra->src = src_traced;
294   extra->dst = rank;
295   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 #endif
298   MPI_Status status;
299   //unknow size from the receiver pov
300   if(size==-1){
301       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
302       size=status.count;
303   }
304
305   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
306
307 #ifdef HAVE_TRACING
308   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
309   request->recv = 1;
310 #endif
311   xbt_dynar_push(reqq[smpi_process_index()],&request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_test(const char *const *action){
317   double clock = smpi_process_simulated_elapsed();
318   MPI_Request request;
319   MPI_Status status;
320   int flag = TRUE;
321
322   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
323   xbt_assert(request != NULL, "found null request in reqq");
324
325 #ifdef HAVE_TRACING
326   int rank = smpi_process_index();
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type=TRACING_TEST;
329   TRACE_smpi_testing_in(rank, extra);
330 #endif
331   flag = smpi_mpi_test(&request, &status);
332   XBT_DEBUG("MPI_Test result: %d", flag);
333   /* push back request in dynar to be caught by a subsequent wait. if the test
334    * did succeed, the request is now NULL.
335    */
336   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_testing_out(rank);
340 #endif
341
342   log_timed_action (action, clock);
343 }
344
345 static void action_wait(const char *const *action){
346   double clock = smpi_process_simulated_elapsed();
347   MPI_Request request;
348   MPI_Status status;
349
350   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
351       "action wait not preceded by any irecv or isend: %s",
352       xbt_str_join_array(action," "));
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354
355   if (!request){
356     /* Assuming that the trace is well formed, this mean the comm might have
357      * been caught by a MPI_test. Then just return.
358      */
359     return;
360   }
361
362 #ifdef HAVE_TRACING
363   int rank = request->comm != MPI_COMM_NULL
364       ? smpi_comm_rank(request->comm)
365       : -1;
366
367   MPI_Group group = smpi_comm_group(request->comm);
368   int src_traced = smpi_group_rank(group, request->src);
369   int dst_traced = smpi_group_rank(group, request->dst);
370   int is_wait_for_receive = request->recv;
371   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
372   extra->type = TRACING_WAIT;
373   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
374 #endif
375   smpi_mpi_wait(&request, &status);
376 #ifdef HAVE_TRACING
377   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
378   if (is_wait_for_receive) {
379     TRACE_smpi_recv(rank, src_traced, dst_traced);
380   }
381 #endif
382
383   log_timed_action (action, clock);
384 }
385
386 static void action_waitall(const char *const *action){
387   double clock = smpi_process_simulated_elapsed();
388   int count_requests=0;
389   unsigned int i=0;
390
391   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
392
393   if (count_requests>0) {
394     MPI_Request requests[count_requests];
395     MPI_Status status[count_requests];
396
397     /*  The reqq is an array of dynars. Its index corresponds to the rank.
398      Thus each rank saves its own requests to the array request. */
399     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
400
401   #ifdef HAVE_TRACING
402    //save information from requests
403
404    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
405    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
406    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
407    for (i = 0; i < count_requests; i++) {
408     if(requests[i]){
409       int *asrc = xbt_new(int, 1);
410       int *adst = xbt_new(int, 1);
411       int *arecv = xbt_new(int, 1);
412       *asrc = requests[i]->src;
413       *adst = requests[i]->dst;
414       *arecv = requests[i]->recv;
415       xbt_dynar_insert_at(srcs, i, asrc);
416       xbt_dynar_insert_at(dsts, i, adst);
417       xbt_dynar_insert_at(recvs, i, arecv);
418       xbt_free(asrc);
419       xbt_free(adst);
420       xbt_free(arecv);
421     }else {
422       int *t = xbt_new(int, 1);
423       xbt_dynar_insert_at(srcs, i, t);
424       xbt_dynar_insert_at(dsts, i, t);
425       xbt_dynar_insert_at(recvs, i, t);
426       xbt_free(t);
427     }
428    }
429    int rank_traced = smpi_process_index();
430    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431    extra->type = TRACING_WAITALL;
432    extra->send_size=count_requests;
433    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
434  #endif
435
436     smpi_mpi_waitall(count_requests, requests, status);
437
438   #ifdef HAVE_TRACING
439    for (i = 0; i < count_requests; i++) {
440     int src_traced, dst_traced, is_wait_for_receive;
441     xbt_dynar_get_cpy(srcs, i, &src_traced);
442     xbt_dynar_get_cpy(dsts, i, &dst_traced);
443     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
444     if (is_wait_for_receive) {
445       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
446     }
447    }
448    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
449    //clean-up of dynars
450    xbt_dynar_free(&srcs);
451    xbt_dynar_free(&dsts);
452    xbt_dynar_free(&recvs);
453   #endif
454
455    xbt_dynar_free_container(&(reqq[smpi_process_index()]));
456   }
457   log_timed_action (action, clock);
458 }
459
460 static void action_barrier(const char *const *action){
461   double clock = smpi_process_simulated_elapsed();
462 #ifdef HAVE_TRACING
463   int rank = smpi_process_index();
464   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465   extra->type = TRACING_BARRIER;
466   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
467 #endif
468   mpi_coll_barrier_fun(MPI_COMM_WORLD);
469 #ifdef HAVE_TRACING
470   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
471 #endif
472
473   log_timed_action (action, clock);
474 }
475
476
477 static void action_bcast(const char *const *action)
478 {
479   double size = parse_double(action[2]);
480   double clock = smpi_process_simulated_elapsed();
481   int root=0;
482   /*
483    * Initialize MPI_CURRENT_TYPE in order to decrease
484    * the number of the checks
485    * */
486   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
487
488   if(action[3]) {
489     root= atoi(action[3]);
490     if(action[4]) {
491       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
492     }
493   }
494
495 #ifdef HAVE_TRACING
496   int rank = smpi_process_index();
497   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
498
499   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500   extra->type = TRACING_BCAST;
501   extra->send_size = size;
502   extra->root = root_traced;
503   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
505
506 #endif
507
508   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
509 #ifdef HAVE_TRACING
510   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
511 #endif
512
513   log_timed_action (action, clock);
514 }
515
516 static void action_reduce(const char *const *action)
517 {
518   double comm_size = parse_double(action[2]);
519   double comp_size = parse_double(action[3]);
520   double clock = smpi_process_simulated_elapsed();
521   int root=0;
522   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523
524   if(action[4]) {
525     root= atoi(action[4]);
526     if(action[5]) {
527       MPI_CURRENT_TYPE=decode_datatype(action[5]);
528     }
529   }
530
531 #ifdef HAVE_TRACING
532   int rank = smpi_process_index();
533   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
534   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
535   extra->type = TRACING_REDUCE;
536   extra->send_size = comm_size;
537   extra->comp_size = comp_size;
538   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
539   extra->root = root_traced;
540
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
542 #endif
543    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
544    smpi_execute_flops(comp_size);
545 #ifdef HAVE_TRACING
546   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
547 #endif
548
549   log_timed_action (action, clock);
550 }
551
552 static void action_allReduce(const char *const *action) {
553   double comm_size = parse_double(action[2]);
554   double comp_size = parse_double(action[3]);
555
556   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
557   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
558
559   double clock = smpi_process_simulated_elapsed();
560 #ifdef HAVE_TRACING
561   int rank = smpi_process_index();
562   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
563   extra->type = TRACING_ALLREDUCE;
564   extra->send_size = comm_size;
565   extra->comp_size = comp_size;
566   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
567
568   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
569 #endif
570   mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
571   smpi_execute_flops(comp_size);
572 #ifdef HAVE_TRACING
573   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
574 #endif
575
576   log_timed_action (action, clock);
577 }
578
579 static void action_allToAll(const char *const *action) {
580   double clock = smpi_process_simulated_elapsed();
581   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
582   int send_size = parse_double(action[2]);
583   int recv_size = parse_double(action[3]);
584   MPI_Datatype MPI_CURRENT_TYPE2;
585
586   if(action[4]) {
587     MPI_CURRENT_TYPE=decode_datatype(action[4]);
588     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
589   }
590   else {
591     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
592     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
593   }
594   void *send = xbt_malloc(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));  
595   void *recv = xbt_malloc(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));  
596
597 #ifdef HAVE_TRACING
598   int rank = smpi_process_index();
599   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
600   extra->type = TRACING_ALLTOALL;
601   extra->send_size = send_size;
602   extra->recv_size = recv_size;
603   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
604   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
605
606   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
607 #endif
608
609   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
610
611 #ifdef HAVE_TRACING
612   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
613 #endif
614
615   log_timed_action (action, clock);
616   xbt_free(send);
617   xbt_free(recv);
618 }
619
620
621 static void action_gather(const char *const *action) {
622   /*
623  The structure of the gather action for the rank 0 (total 4 processes) 
624  is the following:   
625  0 gather 68 68 0 0 0
626
627   where: 
628   1) 68 is the sendcounts
629   2) 68 is the recvcounts
630   3) 0 is the root node
631   4) 0 is the send datatype id, see decode_datatype()
632   5) 0 is the recv datatype id, see decode_datatype()
633   */
634   double clock = smpi_process_simulated_elapsed();
635   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
636   int send_size = parse_double(action[2]);
637   int recv_size = parse_double(action[3]);
638   MPI_Datatype MPI_CURRENT_TYPE2;
639   if(action[5]) {
640     MPI_CURRENT_TYPE=decode_datatype(action[5]);
641     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
642   } else {
643     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
644     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
645   }
646   void *send = xbt_malloc(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
647   void *recv = NULL;
648
649   int root=atoi(action[4]);
650   int rank = smpi_process_index();
651
652   if(rank==root)
653     recv = xbt_malloc(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
654
655 #ifdef HAVE_TRACING
656   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
657   extra->type = TRACING_GATHER;
658   extra->send_size = send_size;
659   extra->recv_size = recv_size;
660   extra->root = root;
661   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
662   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
663
664   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
665 #endif
666   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
667                 recv, recv_size, MPI_CURRENT_TYPE2,
668                 root, MPI_COMM_WORLD);
669
670 #ifdef HAVE_TRACING
671   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
672 #endif
673
674   log_timed_action (action, clock);
675   xbt_free(send);
676   xbt_free(recv);
677 }
678
679
680
681 static void action_gatherv(const char *const *action) {
682   /*
683  The structure of the gatherv action for the rank 0 (total 4 processes)
684  is the following:
685  0 gather 68 68 10 10 10 0 0 0
686
687   where:
688   1) 68 is the sendcount
689   2) 68 10 10 10 is the recvcounts
690   3) 0 is the root node
691   4) 0 is the send datatype id, see decode_datatype()
692   5) 0 is the recv datatype id, see decode_datatype()
693   */
694   double clock = smpi_process_simulated_elapsed();
695   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
696   int send_size = parse_double(action[2]);
697   int *disps = xbt_new0(int, comm_size);
698   int *recvcounts = xbt_new0(int, comm_size);
699   int i=0,recv_sum=0;
700
701   MPI_Datatype MPI_CURRENT_TYPE2;
702   if(action[4+comm_size]) {
703     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
704     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
705   } else {
706     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
707     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
708   }
709   void *send = xbt_malloc(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
710   void *recv = NULL;
711   for(i=0;i<comm_size;i++) {
712     recvcounts[i] = atoi(action[i+3]);
713     recv_sum=recv_sum+recvcounts[i];
714     disps[i] = 0;
715   }
716
717   int root=atoi(action[3+comm_size]);
718   int rank = smpi_process_index();
719
720   if(rank==root)
721     recv = xbt_malloc(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
722
723 #ifdef HAVE_TRACING
724   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725   extra->type = TRACING_GATHERV;
726   extra->send_size = send_size;
727   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
728   for(i=0; i< comm_size; i++)//copy data to avoid bad free
729     extra->recvcounts[i] = recvcounts[i];
730   extra->root = root;
731   extra->num_processes = comm_size;
732   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
733   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
734
735   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
736 #endif
737 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
738                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
739                 root, MPI_COMM_WORLD);
740
741 #ifdef HAVE_TRACING
742   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
743 #endif
744
745   log_timed_action (action, clock);
746   xbt_free(recvcounts);
747   xbt_free(send);
748   xbt_free(recv);
749   xbt_free(disps);
750
751 }
752
753 static void action_reducescatter(const char *const *action) {
754
755     /*
756  The structure of the reducescatter action for the rank 0 (total 4 processes) 
757  is the following:   
758 0 reduceScatter 275427 275427 275427 204020 11346849 0
759
760   where: 
761   1) The first four values after the name of the action declare the recvcounts array
762   2) The value 11346849 is the amount of instructions
763   3) The last value corresponds to the datatype, see decode_datatype().
764
765   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
766
767    */
768
769   double clock = smpi_process_simulated_elapsed();
770   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
771   int comp_size = parse_double(action[2+comm_size]);
772   int *recvcounts = xbt_new0(int, comm_size);  
773   int *disps = xbt_new0(int, comm_size);  
774   int i=0;
775   int rank = smpi_process_index();
776
777   if(action[3+comm_size])
778     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
779   else
780     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
781
782   for(i=0;i<comm_size;i++) {
783     recvcounts[i] = atoi(action[i+2]);
784     disps[i] = 0;
785   }
786
787 #ifdef HAVE_TRACING
788   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
789   extra->type = TRACING_REDUCE_SCATTER;
790   extra->send_size = 0;
791   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
792   for(i=0; i< comm_size; i++)//copy data to avoid bad free
793     extra->recvcounts[i] = recvcounts[i];
794   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
795   extra->comp_size = comp_size;
796   extra->num_processes = comm_size;
797
798
799   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
800 #endif
801    mpi_coll_reduce_scatter_fun(NULL, NULL, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
802        MPI_COMM_WORLD);
803    smpi_execute_flops(comp_size);
804
805
806 #ifdef HAVE_TRACING
807   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
808 #endif
809   xbt_free(recvcounts);
810   xbt_free(disps);
811   log_timed_action (action, clock);
812 }
813
814
815 static void action_allgatherv(const char *const *action) {
816
817   /*
818  The structure of the allgatherv action for the rank 0 (total 4 processes) 
819  is the following:   
820 0 allGatherV 275427 275427 275427 275427 204020
821
822   where: 
823   1) 275427 is the sendcount
824   2) The next four elements declare the recvcounts array
825   3) No more values mean that the datatype for sent and receive buffer
826   is the default one, see decode_datatype().
827
828    */
829
830   double clock = smpi_process_simulated_elapsed();
831
832   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
833   int i=0;
834   int sendcount=atoi(action[2]);
835   int *recvcounts = xbt_new0(int, comm_size);  
836   int *disps = xbt_new0(int, comm_size);  
837   int recv_sum=0;  
838   MPI_Datatype MPI_CURRENT_TYPE2;
839
840   if(action[3+comm_size]) {
841     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
842     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
843   } else {
844     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
845     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
846   }
847   void *sendbuf = xbt_malloc(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));    
848
849   for(i=0;i<comm_size;i++) {
850     recvcounts[i] = atoi(action[i+3]);
851     recv_sum=recv_sum+recvcounts[i];
852   }
853   void *recvbuf = xbt_malloc(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));  
854
855 #ifdef HAVE_TRACING
856   int rank = smpi_process_index();
857   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
858   extra->type = TRACING_ALLGATHERV;
859   extra->send_size = sendcount;
860   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
861   for(i=0; i< comm_size; i++)//copy data to avoid bad free
862     extra->recvcounts[i] = recvcounts[i];
863   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
864   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
865   extra->num_processes = comm_size;
866
867   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
868 #endif
869
870   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
871
872 #ifdef HAVE_TRACING
873   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
874 #endif
875
876   log_timed_action (action, clock);
877   xbt_free(sendbuf);
878   xbt_free(recvbuf);
879   xbt_free(recvcounts);
880   xbt_free(disps);
881 }
882
883
884 static void action_allToAllv(const char *const *action) {
885   /*
886  The structure of the allToAllV action for the rank 0 (total 4 processes) 
887  is the following:   
888   0 allToAllV 100 1 7 10 12 100 1 70 10 5
889
890   where: 
891   1) 100 is the size of the send buffer *sizeof(int),
892   2) 1 7 10 12 is the sendcounts array
893   3) 100*sizeof(int) is the size of the receiver buffer
894   4)  1 70 10 5 is the recvcounts array
895
896    */
897
898
899   double clock = smpi_process_simulated_elapsed();
900
901   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
902   int send_buf_size=0,recv_buf_size=0,i=0;
903   int *sendcounts = xbt_new0(int, comm_size);  
904   int *recvcounts = xbt_new0(int, comm_size);  
905   int *senddisps = xbt_new0(int, comm_size);  
906   int *recvdisps = xbt_new0(int, comm_size);  
907
908   MPI_Datatype MPI_CURRENT_TYPE2;
909
910   send_buf_size=parse_double(action[2]);
911   recv_buf_size=parse_double(action[3+comm_size]);
912   if(action[4+2*comm_size]) {
913     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
914     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
915   }
916   else {
917       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
918       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
919   }
920
921   void *sendbuf = xbt_malloc(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));  
922   void *recvbuf = xbt_malloc(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));  
923
924   for(i=0;i<comm_size;i++) {
925     sendcounts[i] = atoi(action[i+3]);
926     recvcounts[i] = atoi(action[i+4+comm_size]);
927   }
928
929
930 #ifdef HAVE_TRACING
931   int rank = smpi_process_index();
932   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
933   extra->type = TRACING_ALLTOALLV;
934   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
935   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
936   extra->num_processes = comm_size;
937
938   for(i=0; i< comm_size; i++){//copy data to avoid bad free
939     extra->send_size += sendcounts[i];
940     extra->sendcounts[i] = sendcounts[i];
941     extra->recv_size += recvcounts[i];
942     extra->recvcounts[i] = recvcounts[i];
943   }
944   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
945   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
946
947   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
948 #endif
949   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
950                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
951                                MPI_COMM_WORLD);
952 #ifdef HAVE_TRACING
953   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
954 #endif
955
956   log_timed_action (action, clock);
957   xbt_free(sendbuf);
958   xbt_free(recvbuf);
959   xbt_free(sendcounts);
960   xbt_free(recvcounts);
961   xbt_free(senddisps);
962   xbt_free(recvdisps);
963 }
964
965 void smpi_replay_init(int *argc, char***argv){
966   smpi_process_init(argc, argv);
967   smpi_process_mark_as_initialized();
968 #ifdef HAVE_TRACING
969   int rank = smpi_process_index();
970   TRACE_smpi_init(rank);
971   TRACE_smpi_computing_init(rank);
972   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
973   extra->type = TRACING_INIT;
974   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
975   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
976 #endif
977
978   if (!smpi_process_index()){
979     _xbt_replay_action_init();
980     xbt_replay_action_register("init",       action_init);
981     xbt_replay_action_register("finalize",   action_finalize);
982     xbt_replay_action_register("comm_size",  action_comm_size);
983     xbt_replay_action_register("comm_split", action_comm_split);
984     xbt_replay_action_register("comm_dup",   action_comm_dup);
985     xbt_replay_action_register("send",       action_send);
986     xbt_replay_action_register("Isend",      action_Isend);
987     xbt_replay_action_register("recv",       action_recv);
988     xbt_replay_action_register("Irecv",      action_Irecv);
989     xbt_replay_action_register("test",       action_test);
990     xbt_replay_action_register("wait",       action_wait);
991     xbt_replay_action_register("waitAll",    action_waitall);
992     xbt_replay_action_register("barrier",    action_barrier);
993     xbt_replay_action_register("bcast",      action_bcast);
994     xbt_replay_action_register("reduce",     action_reduce);
995     xbt_replay_action_register("allReduce",  action_allReduce);
996     xbt_replay_action_register("allToAll",   action_allToAll);
997     xbt_replay_action_register("allToAllV",  action_allToAllv);
998     xbt_replay_action_register("gather",  action_gather);
999     xbt_replay_action_register("gatherV",  action_gatherv);
1000     xbt_replay_action_register("allGatherV",  action_allgatherv);
1001     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1002     xbt_replay_action_register("compute",    action_compute);
1003   }
1004
1005   xbt_replay_action_runner(*argc, *argv);
1006 }
1007
1008 int smpi_replay_finalize(){
1009   double sim_time= 1.;
1010   /* One active process will stop. Decrease the counter*/
1011   XBT_DEBUG("There are %lu elements in reqq[*]",
1012             xbt_dynar_length(reqq[smpi_process_index()]));
1013   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1014     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1015     MPI_Request requests[count_requests];
1016     MPI_Status status[count_requests];
1017     unsigned int i;
1018
1019     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1020     smpi_mpi_waitall(count_requests, requests, status);
1021     active_processes--;
1022   } else {
1023     active_processes--;
1024   }
1025
1026   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1027
1028   if(!active_processes){
1029     /* Last process alive speaking */
1030     /* end the simulated timer */
1031     sim_time = smpi_process_simulated_elapsed();
1032     XBT_INFO("Simulation time %f", sim_time);
1033     _xbt_replay_action_exit();
1034     xbt_free(reqq);
1035     reqq = NULL;
1036   }
1037   mpi_coll_barrier_fun(MPI_COMM_WORLD);
1038 #ifdef HAVE_TRACING
1039   int rank = smpi_process_index();
1040   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1041   extra->type = TRACING_FINALIZE;
1042   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1043 #endif
1044   smpi_process_finalize();
1045 #ifdef HAVE_TRACING
1046   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1047   TRACE_smpi_finalize(smpi_process_index());
1048 #endif
1049   smpi_process_destroy();
1050   return MPI_SUCCESS;
1051 }