Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change way replay is handled, to allow cohabitation between replay and "classic"...
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 void* smpi_get_tmp_sendbuffer(int size){
36   if (!smpi_process_get_replaying())
37         return xbt_malloc(size);
38   if (sendbuffer_size<size){
39     sendbuffer=xbt_realloc(sendbuffer,size);
40     sendbuffer_size=size;
41   }
42   return sendbuffer;
43 }
44 //allocate a single buffer for all recv
45 void* smpi_get_tmp_recvbuffer(int size){
46   if (!smpi_process_get_replaying())
47         return xbt_malloc(size);
48   if (recvbuffer_size<size){
49     recvbuffer=xbt_realloc(recvbuffer,size);
50     recvbuffer_size=size;
51   }
52   return sendbuffer;
53 }
54
55 void smpi_free_tmp_buffer(void* buf){
56   if (!smpi_process_get_replaying())
57     xbt_free(buf);
58 }
59
60 /* Helper function */
61 static double parse_double(const char *string)
62 {
63   double value;
64   char *endptr;
65   value = strtod(string, &endptr);
66   if (*endptr != '\0')
67     THROWF(unknown_error, 0, "%s is not a double", string);
68   return value;
69 }
70
71 static MPI_Datatype decode_datatype(const char *const action)
72 {
73 // Declared datatypes,
74
75   switch(atoi(action))
76   {
77     case 0:
78       MPI_CURRENT_TYPE=MPI_DOUBLE;
79       break;
80     case 1:
81       MPI_CURRENT_TYPE=MPI_INT;
82       break;
83     case 2:
84       MPI_CURRENT_TYPE=MPI_CHAR;
85       break;
86     case 3:
87       MPI_CURRENT_TYPE=MPI_SHORT;
88       break;
89     case 4:
90       MPI_CURRENT_TYPE=MPI_LONG;
91       break;
92     case 5:
93       MPI_CURRENT_TYPE=MPI_FLOAT;
94       break;
95     case 6:
96       MPI_CURRENT_TYPE=MPI_BYTE;
97       break;
98     default:
99       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
100
101   }
102    return MPI_CURRENT_TYPE;
103 }
104
105
106 const char* encode_datatype(MPI_Datatype datatype)
107 {
108
109   //default type for output is set to MPI_BYTE
110   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
111   if (datatype==MPI_BYTE){
112       return "";
113   }
114   if(datatype==MPI_DOUBLE)
115       return "0";
116   if(datatype==MPI_INT)
117       return "1";
118   if(datatype==MPI_CHAR)
119       return "2";
120   if(datatype==MPI_SHORT)
121       return "3";
122   if(datatype==MPI_LONG)
123     return "4";
124   if(datatype==MPI_FLOAT)
125       return "5";
126
127   // default - not implemented.
128   // do not warn here as we pass in this function even for other trace formats
129   return "-1";
130 }
131
132 static void action_init(const char *const *action)
133 {
134   int i;
135   XBT_DEBUG("Initialize the counters");
136
137   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
138   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
139
140   /* start a simulated timer */
141   smpi_process_simulated_start();
142   /*initialize the number of active processes */
143   active_processes = smpi_process_count();
144
145   if (!reqq) {
146     reqq=xbt_new0(xbt_dynar_t,active_processes);
147
148     for(i=0;i<active_processes;i++){
149       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
150     }
151   }
152 }
153
154 static void action_finalize(const char *const *action)
155 {
156 }
157
158 static void action_comm_size(const char *const *action)
159 {
160   double clock = smpi_process_simulated_elapsed();
161
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, clock);
164 }
165
166 static void action_comm_split(const char *const *action)
167 {
168   double clock = smpi_process_simulated_elapsed();
169
170   log_timed_action (action, clock);
171 }
172
173 static void action_comm_dup(const char *const *action)
174 {
175   double clock = smpi_process_simulated_elapsed();
176
177   log_timed_action (action, clock);
178 }
179
180 static void action_compute(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183   double flops= parse_double(action[2]);
184 #ifdef HAVE_TRACING
185   int rank = smpi_process_index();
186   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
187   extra->type=TRACING_COMPUTING;
188   extra->comp_size=flops;
189   TRACE_smpi_computing_in(rank, extra);
190 #endif
191   smpi_execute_flops(flops);
192 #ifdef HAVE_TRACING
193   TRACE_smpi_computing_out(rank);
194 #endif
195
196   log_timed_action (action, clock);
197 }
198
199 static void action_send(const char *const *action)
200 {
201   int to = atoi(action[2]);
202   double size=parse_double(action[3]);
203   double clock = smpi_process_simulated_elapsed();
204
205   if(action[4]) {
206     MPI_CURRENT_TYPE=decode_datatype(action[4]);
207   } else {
208     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
209   }
210
211 #ifdef HAVE_TRACING
212   int rank = smpi_process_index();
213
214   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
215   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
216   extra->type = TRACING_SEND;
217   extra->send_size = size;
218   extra->src = rank;
219   extra->dst = dst_traced;
220   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
221   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
222   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
223 #endif
224
225   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
226
227   log_timed_action (action, clock);
228
229   #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231 #endif
232
233 }
234
235 static void action_Isend(const char *const *action)
236 {
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process_simulated_elapsed();
240   MPI_Request request;
241
242   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
243   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
244
245 #ifdef HAVE_TRACING
246   int rank = smpi_process_index();
247   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
248   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
249   extra->type = TRACING_ISEND;
250   extra->send_size = size;
251   extra->src = rank;
252   extra->dst = dst_traced;
253   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
254   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
255   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
256 #endif
257
258   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
259
260 #ifdef HAVE_TRACING
261   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
262   request->send = 1;
263 #endif
264
265   xbt_dynar_push(reqq[smpi_process_index()],&request);
266
267   log_timed_action (action, clock);
268 }
269
270 static void action_recv(const char *const *action) {
271   int from = atoi(action[2]);
272   double size=parse_double(action[3]);
273   double clock = smpi_process_simulated_elapsed();
274   MPI_Status status;
275
276   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
277   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279 #ifdef HAVE_TRACING
280   int rank = smpi_process_index();
281   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
282
283   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
284   extra->type = TRACING_RECV;
285   extra->send_size = size;
286   extra->src = src_traced;
287   extra->dst = rank;
288   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
289   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
290 #endif
291
292   //unknow size from the receiver pov
293   if(size==-1){
294       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
295       size=status.count;
296   }
297
298   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
299
300 #ifdef HAVE_TRACING
301   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
302   TRACE_smpi_recv(rank, src_traced, rank);
303 #endif
304
305   log_timed_action (action, clock);
306 }
307
308 static void action_Irecv(const char *const *action)
309 {
310   int from = atoi(action[2]);
311   double size=parse_double(action[3]);
312   double clock = smpi_process_simulated_elapsed();
313   MPI_Request request;
314
315   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
316   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
317
318 #ifdef HAVE_TRACING
319   int rank = smpi_process_index();
320   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
321   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
322   extra->type = TRACING_IRECV;
323   extra->send_size = size;
324   extra->src = src_traced;
325   extra->dst = rank;
326   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
327   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
328 #endif
329   MPI_Status status;
330   //unknow size from the receiver pov
331   if(size==-1){
332       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
333       size=status.count;
334   }
335
336   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
340   request->recv = 1;
341 #endif
342   xbt_dynar_push(reqq[smpi_process_index()],&request);
343
344   log_timed_action (action, clock);
345 }
346
347 static void action_test(const char *const *action){
348   double clock = smpi_process_simulated_elapsed();
349   MPI_Request request;
350   MPI_Status status;
351   int flag = TRUE;
352
353   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
354   xbt_assert(request != NULL, "found null request in reqq");
355
356 #ifdef HAVE_TRACING
357   int rank = smpi_process_index();
358   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
359   extra->type=TRACING_TEST;
360   TRACE_smpi_testing_in(rank, extra);
361 #endif
362   flag = smpi_mpi_test(&request, &status);
363   XBT_DEBUG("MPI_Test result: %d", flag);
364   /* push back request in dynar to be caught by a subsequent wait. if the test
365    * did succeed, the request is now NULL.
366    */
367   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
368
369 #ifdef HAVE_TRACING
370   TRACE_smpi_testing_out(rank);
371 #endif
372
373   log_timed_action (action, clock);
374 }
375
376 static void action_wait(const char *const *action){
377   double clock = smpi_process_simulated_elapsed();
378   MPI_Request request;
379   MPI_Status status;
380
381   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
382       "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
385
386   if (!request){
387     /* Assuming that the trace is well formed, this mean the comm might have
388      * been caught by a MPI_test. Then just return.
389      */
390     return;
391   }
392
393 #ifdef HAVE_TRACING
394   int rank = request->comm != MPI_COMM_NULL
395       ? smpi_comm_rank(request->comm)
396       : -1;
397
398   MPI_Group group = smpi_comm_group(request->comm);
399   int src_traced = smpi_group_rank(group, request->src);
400   int dst_traced = smpi_group_rank(group, request->dst);
401   int is_wait_for_receive = request->recv;
402   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
403   extra->type = TRACING_WAIT;
404   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
405 #endif
406   smpi_mpi_wait(&request, &status);
407 #ifdef HAVE_TRACING
408   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
409   if (is_wait_for_receive) {
410     TRACE_smpi_recv(rank, src_traced, dst_traced);
411   }
412 #endif
413
414   log_timed_action (action, clock);
415 }
416
417 static void action_waitall(const char *const *action){
418   double clock = smpi_process_simulated_elapsed();
419   int count_requests=0;
420   unsigned int i=0;
421
422   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
423
424   if (count_requests>0) {
425     MPI_Request requests[count_requests];
426     MPI_Status status[count_requests];
427
428     /*  The reqq is an array of dynars. Its index corresponds to the rank.
429      Thus each rank saves its own requests to the array request. */
430     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
431
432   #ifdef HAVE_TRACING
433    //save information from requests
434
435    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
436    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
437    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
438    for (i = 0; i < count_requests; i++) {
439     if(requests[i]){
440       int *asrc = xbt_new(int, 1);
441       int *adst = xbt_new(int, 1);
442       int *arecv = xbt_new(int, 1);
443       *asrc = requests[i]->src;
444       *adst = requests[i]->dst;
445       *arecv = requests[i]->recv;
446       xbt_dynar_insert_at(srcs, i, asrc);
447       xbt_dynar_insert_at(dsts, i, adst);
448       xbt_dynar_insert_at(recvs, i, arecv);
449       xbt_free(asrc);
450       xbt_free(adst);
451       xbt_free(arecv);
452     }else {
453       int *t = xbt_new(int, 1);
454       xbt_dynar_insert_at(srcs, i, t);
455       xbt_dynar_insert_at(dsts, i, t);
456       xbt_dynar_insert_at(recvs, i, t);
457       xbt_free(t);
458     }
459    }
460    int rank_traced = smpi_process_index();
461    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
462    extra->type = TRACING_WAITALL;
463    extra->send_size=count_requests;
464    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
465  #endif
466
467     smpi_mpi_waitall(count_requests, requests, status);
468
469   #ifdef HAVE_TRACING
470    for (i = 0; i < count_requests; i++) {
471     int src_traced, dst_traced, is_wait_for_receive;
472     xbt_dynar_get_cpy(srcs, i, &src_traced);
473     xbt_dynar_get_cpy(dsts, i, &dst_traced);
474     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
475     if (is_wait_for_receive) {
476       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
477     }
478    }
479    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
480    //clean-up of dynars
481    xbt_dynar_free(&srcs);
482    xbt_dynar_free(&dsts);
483    xbt_dynar_free(&recvs);
484   #endif
485
486    xbt_dynar_free_container(&(reqq[smpi_process_index()]));
487   }
488   log_timed_action (action, clock);
489 }
490
491 static void action_barrier(const char *const *action){
492   double clock = smpi_process_simulated_elapsed();
493 #ifdef HAVE_TRACING
494   int rank = smpi_process_index();
495   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496   extra->type = TRACING_BARRIER;
497   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
498 #endif
499   mpi_coll_barrier_fun(MPI_COMM_WORLD);
500 #ifdef HAVE_TRACING
501   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
502 #endif
503
504   log_timed_action (action, clock);
505 }
506
507
508 static void action_bcast(const char *const *action)
509 {
510   double size = parse_double(action[2]);
511   double clock = smpi_process_simulated_elapsed();
512   int root=0;
513   /*
514    * Initialize MPI_CURRENT_TYPE in order to decrease
515    * the number of the checks
516    * */
517   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
518
519   if(action[3]) {
520     root= atoi(action[3]);
521     if(action[4]) {
522       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
523     }
524   }
525
526 #ifdef HAVE_TRACING
527   int rank = smpi_process_index();
528   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
529
530   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
531   extra->type = TRACING_BCAST;
532   extra->send_size = size;
533   extra->root = root_traced;
534   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
535   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
536
537 #endif
538
539   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
540 #ifdef HAVE_TRACING
541   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
542 #endif
543
544   log_timed_action (action, clock);
545 }
546
547 static void action_reduce(const char *const *action)
548 {
549   double comm_size = parse_double(action[2]);
550   double comp_size = parse_double(action[3]);
551   double clock = smpi_process_simulated_elapsed();
552   int root=0;
553   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
554
555   if(action[4]) {
556     root= atoi(action[4]);
557     if(action[5]) {
558       MPI_CURRENT_TYPE=decode_datatype(action[5]);
559     }
560   }
561
562 #ifdef HAVE_TRACING
563   int rank = smpi_process_index();
564   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
565   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
566   extra->type = TRACING_REDUCE;
567   extra->send_size = comm_size;
568   extra->comp_size = comp_size;
569   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
570   extra->root = root_traced;
571
572   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
573 #endif
574    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
575    smpi_execute_flops(comp_size);
576 #ifdef HAVE_TRACING
577   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
578 #endif
579
580   log_timed_action (action, clock);
581 }
582
583 static void action_allReduce(const char *const *action) {
584   double comm_size = parse_double(action[2]);
585   double comp_size = parse_double(action[3]);
586
587   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
588   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
589
590   double clock = smpi_process_simulated_elapsed();
591 #ifdef HAVE_TRACING
592   int rank = smpi_process_index();
593   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
594   extra->type = TRACING_ALLREDUCE;
595   extra->send_size = comm_size;
596   extra->comp_size = comp_size;
597   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
598
599   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
600 #endif
601   mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
602   smpi_execute_flops(comp_size);
603 #ifdef HAVE_TRACING
604   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
605 #endif
606
607   log_timed_action (action, clock);
608 }
609
610 static void action_allToAll(const char *const *action) {
611   double clock = smpi_process_simulated_elapsed();
612   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
613   int send_size = parse_double(action[2]);
614   int recv_size = parse_double(action[3]);
615   MPI_Datatype MPI_CURRENT_TYPE2;
616
617   if(action[4]) {
618     MPI_CURRENT_TYPE=decode_datatype(action[4]);
619     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
620   }
621   else {
622     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
623     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
624   }
625   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
626   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
627
628 #ifdef HAVE_TRACING
629   int rank = smpi_process_index();
630   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
631   extra->type = TRACING_ALLTOALL;
632   extra->send_size = send_size;
633   extra->recv_size = recv_size;
634   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
635   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
636
637   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
638 #endif
639
640   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
641
642 #ifdef HAVE_TRACING
643   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
644 #endif
645
646   log_timed_action (action, clock);
647
648 }
649
650
651 static void action_gather(const char *const *action) {
652   /*
653  The structure of the gather action for the rank 0 (total 4 processes) 
654  is the following:   
655  0 gather 68 68 0 0 0
656
657   where: 
658   1) 68 is the sendcounts
659   2) 68 is the recvcounts
660   3) 0 is the root node
661   4) 0 is the send datatype id, see decode_datatype()
662   5) 0 is the recv datatype id, see decode_datatype()
663   */
664   double clock = smpi_process_simulated_elapsed();
665   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
666   int send_size = parse_double(action[2]);
667   int recv_size = parse_double(action[3]);
668   MPI_Datatype MPI_CURRENT_TYPE2;
669   if(action[5]) {
670     MPI_CURRENT_TYPE=decode_datatype(action[5]);
671     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
672   } else {
673     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
674     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
675   }
676   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
677   void *recv = NULL;
678
679   int root=atoi(action[4]);
680   int rank = smpi_process_index();
681
682   if(rank==root)
683     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
684
685 #ifdef HAVE_TRACING
686   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
687   extra->type = TRACING_GATHER;
688   extra->send_size = send_size;
689   extra->recv_size = recv_size;
690   extra->root = root;
691   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
692   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
693
694   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
695 #endif
696   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
697                 recv, recv_size, MPI_CURRENT_TYPE2,
698                 root, MPI_COMM_WORLD);
699
700 #ifdef HAVE_TRACING
701   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
702 #endif
703
704   log_timed_action (action, clock);
705
706 }
707
708
709
710 static void action_gatherv(const char *const *action) {
711   /*
712  The structure of the gatherv action for the rank 0 (total 4 processes)
713  is the following:
714  0 gather 68 68 10 10 10 0 0 0
715
716   where:
717   1) 68 is the sendcount
718   2) 68 10 10 10 is the recvcounts
719   3) 0 is the root node
720   4) 0 is the send datatype id, see decode_datatype()
721   5) 0 is the recv datatype id, see decode_datatype()
722   */
723   double clock = smpi_process_simulated_elapsed();
724   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
725   int send_size = parse_double(action[2]);
726   int *disps = xbt_new0(int, comm_size);
727   int *recvcounts = xbt_new0(int, comm_size);
728   int i=0,recv_sum=0;
729
730   MPI_Datatype MPI_CURRENT_TYPE2;
731   if(action[4+comm_size]) {
732     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
733     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
734   } else {
735     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
736     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
737   }
738   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
739   void *recv = NULL;
740   for(i=0;i<comm_size;i++) {
741     recvcounts[i] = atoi(action[i+3]);
742     recv_sum=recv_sum+recvcounts[i];
743     disps[i] = 0;
744   }
745
746   int root=atoi(action[3+comm_size]);
747   int rank = smpi_process_index();
748
749   if(rank==root)
750     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
751
752 #ifdef HAVE_TRACING
753   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
754   extra->type = TRACING_GATHERV;
755   extra->send_size = send_size;
756   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
757   for(i=0; i< comm_size; i++)//copy data to avoid bad free
758     extra->recvcounts[i] = recvcounts[i];
759   extra->root = root;
760   extra->num_processes = comm_size;
761   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
762   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
763
764   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
765 #endif
766 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
767                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
768                 root, MPI_COMM_WORLD);
769
770 #ifdef HAVE_TRACING
771   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
772 #endif
773
774   log_timed_action (action, clock);
775   xbt_free(recvcounts);
776   xbt_free(disps);
777
778 }
779
780 static void action_reducescatter(const char *const *action) {
781
782     /*
783  The structure of the reducescatter action for the rank 0 (total 4 processes) 
784  is the following:   
785 0 reduceScatter 275427 275427 275427 204020 11346849 0
786
787   where: 
788   1) The first four values after the name of the action declare the recvcounts array
789   2) The value 11346849 is the amount of instructions
790   3) The last value corresponds to the datatype, see decode_datatype().
791
792   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
793
794    */
795
796   double clock = smpi_process_simulated_elapsed();
797   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
798   int comp_size = parse_double(action[2+comm_size]);
799   int *recvcounts = xbt_new0(int, comm_size);  
800   int *disps = xbt_new0(int, comm_size);  
801   int i=0;
802   int rank = smpi_process_index();
803
804   if(action[3+comm_size])
805     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
806   else
807     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
808
809   for(i=0;i<comm_size;i++) {
810     recvcounts[i] = atoi(action[i+2]);
811     disps[i] = 0;
812   }
813
814 #ifdef HAVE_TRACING
815   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816   extra->type = TRACING_REDUCE_SCATTER;
817   extra->send_size = 0;
818   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
819   for(i=0; i< comm_size; i++)//copy data to avoid bad free
820     extra->recvcounts[i] = recvcounts[i];
821   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
822   extra->comp_size = comp_size;
823   extra->num_processes = comm_size;
824
825
826   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
827 #endif
828    mpi_coll_reduce_scatter_fun(NULL, NULL, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
829        MPI_COMM_WORLD);
830    smpi_execute_flops(comp_size);
831
832
833 #ifdef HAVE_TRACING
834   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
835 #endif
836   xbt_free(recvcounts);
837   xbt_free(disps);
838   log_timed_action (action, clock);
839 }
840
841
842 static void action_allgatherv(const char *const *action) {
843
844   /*
845  The structure of the allgatherv action for the rank 0 (total 4 processes) 
846  is the following:   
847 0 allGatherV 275427 275427 275427 275427 204020
848
849   where: 
850   1) 275427 is the sendcount
851   2) The next four elements declare the recvcounts array
852   3) No more values mean that the datatype for sent and receive buffer
853   is the default one, see decode_datatype().
854
855    */
856
857   double clock = smpi_process_simulated_elapsed();
858
859   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
860   int i=0;
861   int sendcount=atoi(action[2]);
862   int *recvcounts = xbt_new0(int, comm_size);  
863   int *disps = xbt_new0(int, comm_size);  
864   int recv_sum=0;  
865   MPI_Datatype MPI_CURRENT_TYPE2;
866
867   if(action[3+comm_size]) {
868     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
869     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
870   } else {
871     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
872     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
873   }
874   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
875
876   for(i=0;i<comm_size;i++) {
877     recvcounts[i] = atoi(action[i+3]);
878     recv_sum=recv_sum+recvcounts[i];
879   }
880   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
881
882 #ifdef HAVE_TRACING
883   int rank = smpi_process_index();
884   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
885   extra->type = TRACING_ALLGATHERV;
886   extra->send_size = sendcount;
887   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
888   for(i=0; i< comm_size; i++)//copy data to avoid bad free
889     extra->recvcounts[i] = recvcounts[i];
890   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
891   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
892   extra->num_processes = comm_size;
893
894   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
895 #endif
896
897   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
898
899 #ifdef HAVE_TRACING
900   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
901 #endif
902
903   log_timed_action (action, clock);
904   xbt_free(recvcounts);
905   xbt_free(disps);
906 }
907
908
909 static void action_allToAllv(const char *const *action) {
910   /*
911  The structure of the allToAllV action for the rank 0 (total 4 processes) 
912  is the following:   
913   0 allToAllV 100 1 7 10 12 100 1 70 10 5
914
915   where: 
916   1) 100 is the size of the send buffer *sizeof(int),
917   2) 1 7 10 12 is the sendcounts array
918   3) 100*sizeof(int) is the size of the receiver buffer
919   4)  1 70 10 5 is the recvcounts array
920
921    */
922
923
924   double clock = smpi_process_simulated_elapsed();
925
926   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
927   int send_buf_size=0,recv_buf_size=0,i=0;
928   int *sendcounts = xbt_new0(int, comm_size);  
929   int *recvcounts = xbt_new0(int, comm_size);  
930   int *senddisps = xbt_new0(int, comm_size);  
931   int *recvdisps = xbt_new0(int, comm_size);  
932
933   MPI_Datatype MPI_CURRENT_TYPE2;
934
935   send_buf_size=parse_double(action[2]);
936   recv_buf_size=parse_double(action[3+comm_size]);
937   if(action[4+2*comm_size]) {
938     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
939     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
940   }
941   else {
942       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
943       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
944   }
945
946   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
947   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
948
949   for(i=0;i<comm_size;i++) {
950     sendcounts[i] = atoi(action[i+3]);
951     recvcounts[i] = atoi(action[i+4+comm_size]);
952   }
953
954
955 #ifdef HAVE_TRACING
956   int rank = smpi_process_index();
957   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
958   extra->type = TRACING_ALLTOALLV;
959   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
960   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
961   extra->num_processes = comm_size;
962
963   for(i=0; i< comm_size; i++){//copy data to avoid bad free
964     extra->send_size += sendcounts[i];
965     extra->sendcounts[i] = sendcounts[i];
966     extra->recv_size += recvcounts[i];
967     extra->recvcounts[i] = recvcounts[i];
968   }
969   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
970   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
971
972   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
973 #endif
974   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
975                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
976                                MPI_COMM_WORLD);
977 #ifdef HAVE_TRACING
978   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
979 #endif
980
981   log_timed_action (action, clock);
982   xbt_free(sendcounts);
983   xbt_free(recvcounts);
984   xbt_free(senddisps);
985   xbt_free(recvdisps);
986 }
987
988 void smpi_replay_init(int *argc, char***argv){
989   smpi_process_init(argc, argv);
990   smpi_process_mark_as_initialized();
991   smpi_process_set_replaying(1);
992 #ifdef HAVE_TRACING
993   int rank = smpi_process_index();
994   TRACE_smpi_init(rank);
995   TRACE_smpi_computing_init(rank);
996   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
997   extra->type = TRACING_INIT;
998   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
999   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1000 #endif
1001
1002   if (!smpi_process_index()){
1003     _xbt_replay_action_init();
1004     xbt_replay_action_register("init",       action_init);
1005     xbt_replay_action_register("finalize",   action_finalize);
1006     xbt_replay_action_register("comm_size",  action_comm_size);
1007     xbt_replay_action_register("comm_split", action_comm_split);
1008     xbt_replay_action_register("comm_dup",   action_comm_dup);
1009     xbt_replay_action_register("send",       action_send);
1010     xbt_replay_action_register("Isend",      action_Isend);
1011     xbt_replay_action_register("recv",       action_recv);
1012     xbt_replay_action_register("Irecv",      action_Irecv);
1013     xbt_replay_action_register("test",       action_test);
1014     xbt_replay_action_register("wait",       action_wait);
1015     xbt_replay_action_register("waitAll",    action_waitall);
1016     xbt_replay_action_register("barrier",    action_barrier);
1017     xbt_replay_action_register("bcast",      action_bcast);
1018     xbt_replay_action_register("reduce",     action_reduce);
1019     xbt_replay_action_register("allReduce",  action_allReduce);
1020     xbt_replay_action_register("allToAll",   action_allToAll);
1021     xbt_replay_action_register("allToAllV",  action_allToAllv);
1022     xbt_replay_action_register("gather",  action_gather);
1023     xbt_replay_action_register("gatherV",  action_gatherv);
1024     xbt_replay_action_register("allGatherV",  action_allgatherv);
1025     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1026     xbt_replay_action_register("compute",    action_compute);
1027   }
1028   
1029   //if we have a delayed start, sleep here.
1030   if(*argc>2){
1031     char *endptr;
1032     double value = strtod((*argv)[2], &endptr);
1033     if (*endptr != '\0')
1034       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1035     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1036     smpi_execute_flops(value);
1037   }
1038   xbt_replay_action_runner(*argc, *argv);
1039 }
1040
1041 int smpi_replay_finalize(){
1042   double sim_time= 1.;
1043   /* One active process will stop. Decrease the counter*/
1044   XBT_DEBUG("There are %lu elements in reqq[*]",
1045             xbt_dynar_length(reqq[smpi_process_index()]));
1046   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1047     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1048     MPI_Request requests[count_requests];
1049     MPI_Status status[count_requests];
1050     unsigned int i;
1051
1052     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1053     smpi_mpi_waitall(count_requests, requests, status);
1054     active_processes--;
1055   } else {
1056     active_processes--;
1057   }
1058
1059   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1060
1061   if(!active_processes){
1062     /* Last process alive speaking */
1063     /* end the simulated timer */
1064     sim_time = smpi_process_simulated_elapsed();
1065     XBT_INFO("Simulation time %f", sim_time);
1066     _xbt_replay_action_exit();
1067     xbt_free(sendbuffer);
1068     xbt_free(recvbuffer);
1069     xbt_free(reqq);
1070     reqq = NULL;
1071   }
1072   mpi_coll_barrier_fun(MPI_COMM_WORLD);
1073 #ifdef HAVE_TRACING
1074   int rank = smpi_process_index();
1075   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1076   extra->type = TRACING_FINALIZE;
1077   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1078 #endif
1079   smpi_process_finalize();
1080 #ifdef HAVE_TRACING
1081   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1082   TRACE_smpi_finalize(smpi_process_index());
1083 #endif
1084   smpi_process_destroy();
1085   return MPI_SUCCESS;
1086 }