Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
for replay and collectives, only allocate buffers once (grow it if needed).
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static int sendbuffer_size=0;
22 char* sendbuffer=NULL;
23 static int recvbuffer_size=0;
24 char* recvbuffer=NULL;
25
26 static void log_timed_action (const char *const *action, double clock){
27   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
28     char *name = xbt_str_join_array(action, " ");
29     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30     free(name);
31   }
32 }
33
34 //allocate a single buffer for all sends, growing it if needed
35 static void* get_sendbuffer(int size){
36   if (sendbuffer_size<size){
37     sendbuffer=xbt_realloc(sendbuffer,size);
38     sendbuffer_size=size;
39   }
40   return sendbuffer;
41 }
42 //allocate a single buffer for all recv
43 static void* get_recvbuffer(int size){
44   if (recvbuffer_size<size){
45     recvbuffer=xbt_realloc(recvbuffer,size);
46     recvbuffer_size=size;
47   }
48   return sendbuffer;
49 }
50
51 /* Helper function */
52 static double parse_double(const char *string)
53 {
54   double value;
55   char *endptr;
56   value = strtod(string, &endptr);
57   if (*endptr != '\0')
58     THROWF(unknown_error, 0, "%s is not a double", string);
59   return value;
60 }
61
62 static MPI_Datatype decode_datatype(const char *const action)
63 {
64 // Declared datatypes,
65
66   switch(atoi(action))
67   {
68     case 0:
69       MPI_CURRENT_TYPE=MPI_DOUBLE;
70       break;
71     case 1:
72       MPI_CURRENT_TYPE=MPI_INT;
73       break;
74     case 2:
75       MPI_CURRENT_TYPE=MPI_CHAR;
76       break;
77     case 3:
78       MPI_CURRENT_TYPE=MPI_SHORT;
79       break;
80     case 4:
81       MPI_CURRENT_TYPE=MPI_LONG;
82       break;
83     case 5:
84       MPI_CURRENT_TYPE=MPI_FLOAT;
85       break;
86     case 6:
87       MPI_CURRENT_TYPE=MPI_BYTE;
88       break;
89     default:
90       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
91
92   }
93    return MPI_CURRENT_TYPE;
94 }
95
96
97 const char* encode_datatype(MPI_Datatype datatype)
98 {
99
100   //default type for output is set to MPI_BYTE
101   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
102   if (datatype==MPI_BYTE){
103       return "";
104   }
105   if(datatype==MPI_DOUBLE)
106       return "0";
107   if(datatype==MPI_INT)
108       return "1";
109   if(datatype==MPI_CHAR)
110       return "2";
111   if(datatype==MPI_SHORT)
112       return "3";
113   if(datatype==MPI_LONG)
114     return "4";
115   if(datatype==MPI_FLOAT)
116       return "5";
117
118   // default - not implemented.
119   // do not warn here as we pass in this function even for other trace formats
120   return "-1";
121 }
122
123 static void action_init(const char *const *action)
124 {
125   int i;
126   XBT_DEBUG("Initialize the counters");
127
128   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
129   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
130
131   /* start a simulated timer */
132   smpi_process_simulated_start();
133   /*initialize the number of active processes */
134   active_processes = smpi_process_count();
135
136   if (!reqq) {
137     reqq=xbt_new0(xbt_dynar_t,active_processes);
138
139     for(i=0;i<active_processes;i++){
140       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
141     }
142   }
143 }
144
145 static void action_finalize(const char *const *action)
146 {
147 }
148
149 static void action_comm_size(const char *const *action)
150 {
151   double clock = smpi_process_simulated_elapsed();
152
153   communicator_size = parse_double(action[2]);
154   log_timed_action (action, clock);
155 }
156
157 static void action_comm_split(const char *const *action)
158 {
159   double clock = smpi_process_simulated_elapsed();
160
161   log_timed_action (action, clock);
162 }
163
164 static void action_comm_dup(const char *const *action)
165 {
166   double clock = smpi_process_simulated_elapsed();
167
168   log_timed_action (action, clock);
169 }
170
171 static void action_compute(const char *const *action)
172 {
173   double clock = smpi_process_simulated_elapsed();
174   double flops= parse_double(action[2]);
175 #ifdef HAVE_TRACING
176   int rank = smpi_process_index();
177   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
178   extra->type=TRACING_COMPUTING;
179   extra->comp_size=flops;
180   TRACE_smpi_computing_in(rank, extra);
181 #endif
182   smpi_execute_flops(flops);
183 #ifdef HAVE_TRACING
184   TRACE_smpi_computing_out(rank);
185 #endif
186
187   log_timed_action (action, clock);
188 }
189
190 static void action_send(const char *const *action)
191 {
192   int to = atoi(action[2]);
193   double size=parse_double(action[3]);
194   double clock = smpi_process_simulated_elapsed();
195
196   if(action[4]) {
197     MPI_CURRENT_TYPE=decode_datatype(action[4]);
198   } else {
199     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
200   }
201
202 #ifdef HAVE_TRACING
203   int rank = smpi_process_index();
204
205   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
206   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207   extra->type = TRACING_SEND;
208   extra->send_size = size;
209   extra->src = rank;
210   extra->dst = dst_traced;
211   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
212   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
213   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
214 #endif
215
216   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
217
218   log_timed_action (action, clock);
219
220   #ifdef HAVE_TRACING
221   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
222 #endif
223
224 }
225
226 static void action_Isend(const char *const *action)
227 {
228   int to = atoi(action[2]);
229   double size=parse_double(action[3]);
230   double clock = smpi_process_simulated_elapsed();
231   MPI_Request request;
232
233   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
234   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
235
236 #ifdef HAVE_TRACING
237   int rank = smpi_process_index();
238   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
239   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
240   extra->type = TRACING_ISEND;
241   extra->send_size = size;
242   extra->src = rank;
243   extra->dst = dst_traced;
244   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
245   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
246   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
247 #endif
248
249   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
250
251 #ifdef HAVE_TRACING
252   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
253   request->send = 1;
254 #endif
255
256   xbt_dynar_push(reqq[smpi_process_index()],&request);
257
258   log_timed_action (action, clock);
259 }
260
261 static void action_recv(const char *const *action) {
262   int from = atoi(action[2]);
263   double size=parse_double(action[3]);
264   double clock = smpi_process_simulated_elapsed();
265   MPI_Status status;
266
267   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
268   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
269
270 #ifdef HAVE_TRACING
271   int rank = smpi_process_index();
272   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
273
274   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
275   extra->type = TRACING_RECV;
276   extra->send_size = size;
277   extra->src = src_traced;
278   extra->dst = rank;
279   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
280   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
281 #endif
282
283   //unknow size from the receiver pov
284   if(size==-1){
285       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
286       size=status.count;
287   }
288
289   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
290
291 #ifdef HAVE_TRACING
292   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
293   TRACE_smpi_recv(rank, src_traced, rank);
294 #endif
295
296   log_timed_action (action, clock);
297 }
298
299 static void action_Irecv(const char *const *action)
300 {
301   int from = atoi(action[2]);
302   double size=parse_double(action[3]);
303   double clock = smpi_process_simulated_elapsed();
304   MPI_Request request;
305
306   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
307   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
308
309 #ifdef HAVE_TRACING
310   int rank = smpi_process_index();
311   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
312   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
313   extra->type = TRACING_IRECV;
314   extra->send_size = size;
315   extra->src = src_traced;
316   extra->dst = rank;
317   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
318   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
319 #endif
320   MPI_Status status;
321   //unknow size from the receiver pov
322   if(size==-1){
323       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
324       size=status.count;
325   }
326
327   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
328
329 #ifdef HAVE_TRACING
330   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
331   request->recv = 1;
332 #endif
333   xbt_dynar_push(reqq[smpi_process_index()],&request);
334
335   log_timed_action (action, clock);
336 }
337
338 static void action_test(const char *const *action){
339   double clock = smpi_process_simulated_elapsed();
340   MPI_Request request;
341   MPI_Status status;
342   int flag = TRUE;
343
344   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
345   xbt_assert(request != NULL, "found null request in reqq");
346
347 #ifdef HAVE_TRACING
348   int rank = smpi_process_index();
349   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
350   extra->type=TRACING_TEST;
351   TRACE_smpi_testing_in(rank, extra);
352 #endif
353   flag = smpi_mpi_test(&request, &status);
354   XBT_DEBUG("MPI_Test result: %d", flag);
355   /* push back request in dynar to be caught by a subsequent wait. if the test
356    * did succeed, the request is now NULL.
357    */
358   xbt_dynar_push_as(reqq[smpi_process_index()],MPI_Request, request);
359
360 #ifdef HAVE_TRACING
361   TRACE_smpi_testing_out(rank);
362 #endif
363
364   log_timed_action (action, clock);
365 }
366
367 static void action_wait(const char *const *action){
368   double clock = smpi_process_simulated_elapsed();
369   MPI_Request request;
370   MPI_Status status;
371
372   xbt_assert(xbt_dynar_length(reqq[smpi_process_index()]),
373       "action wait not preceded by any irecv or isend: %s",
374       xbt_str_join_array(action," "));
375   request = xbt_dynar_pop_as(reqq[smpi_process_index()],MPI_Request);
376
377   if (!request){
378     /* Assuming that the trace is well formed, this mean the comm might have
379      * been caught by a MPI_test. Then just return.
380      */
381     return;
382   }
383
384 #ifdef HAVE_TRACING
385   int rank = request->comm != MPI_COMM_NULL
386       ? smpi_comm_rank(request->comm)
387       : -1;
388
389   MPI_Group group = smpi_comm_group(request->comm);
390   int src_traced = smpi_group_rank(group, request->src);
391   int dst_traced = smpi_group_rank(group, request->dst);
392   int is_wait_for_receive = request->recv;
393   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
394   extra->type = TRACING_WAIT;
395   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
396 #endif
397   smpi_mpi_wait(&request, &status);
398 #ifdef HAVE_TRACING
399   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
400   if (is_wait_for_receive) {
401     TRACE_smpi_recv(rank, src_traced, dst_traced);
402   }
403 #endif
404
405   log_timed_action (action, clock);
406 }
407
408 static void action_waitall(const char *const *action){
409   double clock = smpi_process_simulated_elapsed();
410   int count_requests=0;
411   unsigned int i=0;
412
413   count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
414
415   if (count_requests>0) {
416     MPI_Request requests[count_requests];
417     MPI_Status status[count_requests];
418
419     /*  The reqq is an array of dynars. Its index corresponds to the rank.
420      Thus each rank saves its own requests to the array request. */
421     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]); 
422
423   #ifdef HAVE_TRACING
424    //save information from requests
425
426    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
427    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
428    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
429    for (i = 0; i < count_requests; i++) {
430     if(requests[i]){
431       int *asrc = xbt_new(int, 1);
432       int *adst = xbt_new(int, 1);
433       int *arecv = xbt_new(int, 1);
434       *asrc = requests[i]->src;
435       *adst = requests[i]->dst;
436       *arecv = requests[i]->recv;
437       xbt_dynar_insert_at(srcs, i, asrc);
438       xbt_dynar_insert_at(dsts, i, adst);
439       xbt_dynar_insert_at(recvs, i, arecv);
440       xbt_free(asrc);
441       xbt_free(adst);
442       xbt_free(arecv);
443     }else {
444       int *t = xbt_new(int, 1);
445       xbt_dynar_insert_at(srcs, i, t);
446       xbt_dynar_insert_at(dsts, i, t);
447       xbt_dynar_insert_at(recvs, i, t);
448       xbt_free(t);
449     }
450    }
451    int rank_traced = smpi_process_index();
452    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
453    extra->type = TRACING_WAITALL;
454    extra->send_size=count_requests;
455    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
456  #endif
457
458     smpi_mpi_waitall(count_requests, requests, status);
459
460   #ifdef HAVE_TRACING
461    for (i = 0; i < count_requests; i++) {
462     int src_traced, dst_traced, is_wait_for_receive;
463     xbt_dynar_get_cpy(srcs, i, &src_traced);
464     xbt_dynar_get_cpy(dsts, i, &dst_traced);
465     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
466     if (is_wait_for_receive) {
467       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
468     }
469    }
470    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
471    //clean-up of dynars
472    xbt_dynar_free(&srcs);
473    xbt_dynar_free(&dsts);
474    xbt_dynar_free(&recvs);
475   #endif
476
477    xbt_dynar_free_container(&(reqq[smpi_process_index()]));
478   }
479   log_timed_action (action, clock);
480 }
481
482 static void action_barrier(const char *const *action){
483   double clock = smpi_process_simulated_elapsed();
484 #ifdef HAVE_TRACING
485   int rank = smpi_process_index();
486   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
487   extra->type = TRACING_BARRIER;
488   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
489 #endif
490   mpi_coll_barrier_fun(MPI_COMM_WORLD);
491 #ifdef HAVE_TRACING
492   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
493 #endif
494
495   log_timed_action (action, clock);
496 }
497
498
499 static void action_bcast(const char *const *action)
500 {
501   double size = parse_double(action[2]);
502   double clock = smpi_process_simulated_elapsed();
503   int root=0;
504   /*
505    * Initialize MPI_CURRENT_TYPE in order to decrease
506    * the number of the checks
507    * */
508   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
509
510   if(action[3]) {
511     root= atoi(action[3]);
512     if(action[4]) {
513       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
514     }
515   }
516
517 #ifdef HAVE_TRACING
518   int rank = smpi_process_index();
519   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
520
521   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
522   extra->type = TRACING_BCAST;
523   extra->send_size = size;
524   extra->root = root_traced;
525   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
526   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
527
528 #endif
529
530   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
531 #ifdef HAVE_TRACING
532   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
533 #endif
534
535   log_timed_action (action, clock);
536 }
537
538 static void action_reduce(const char *const *action)
539 {
540   double comm_size = parse_double(action[2]);
541   double comp_size = parse_double(action[3]);
542   double clock = smpi_process_simulated_elapsed();
543   int root=0;
544   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
545
546   if(action[4]) {
547     root= atoi(action[4]);
548     if(action[5]) {
549       MPI_CURRENT_TYPE=decode_datatype(action[5]);
550     }
551   }
552
553 #ifdef HAVE_TRACING
554   int rank = smpi_process_index();
555   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
556   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
557   extra->type = TRACING_REDUCE;
558   extra->send_size = comm_size;
559   extra->comp_size = comp_size;
560   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
561   extra->root = root_traced;
562
563   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
564 #endif
565    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
566    smpi_execute_flops(comp_size);
567 #ifdef HAVE_TRACING
568   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
569 #endif
570
571   log_timed_action (action, clock);
572 }
573
574 static void action_allReduce(const char *const *action) {
575   double comm_size = parse_double(action[2]);
576   double comp_size = parse_double(action[3]);
577
578   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
579   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
580
581   double clock = smpi_process_simulated_elapsed();
582 #ifdef HAVE_TRACING
583   int rank = smpi_process_index();
584   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
585   extra->type = TRACING_ALLREDUCE;
586   extra->send_size = comm_size;
587   extra->comp_size = comp_size;
588   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
589
590   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
591 #endif
592   mpi_coll_allreduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
593   smpi_execute_flops(comp_size);
594 #ifdef HAVE_TRACING
595   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
596 #endif
597
598   log_timed_action (action, clock);
599 }
600
601 static void action_allToAll(const char *const *action) {
602   double clock = smpi_process_simulated_elapsed();
603   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604   int send_size = parse_double(action[2]);
605   int recv_size = parse_double(action[3]);
606   MPI_Datatype MPI_CURRENT_TYPE2;
607
608   if(action[4]) {
609     MPI_CURRENT_TYPE=decode_datatype(action[4]);
610     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
611   }
612   else {
613     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
615   }
616   void *send = get_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));  
617   void *recv = get_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));  
618
619 #ifdef HAVE_TRACING
620   int rank = smpi_process_index();
621   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
622   extra->type = TRACING_ALLTOALL;
623   extra->send_size = send_size;
624   extra->recv_size = recv_size;
625   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
626   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
627
628   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
629 #endif
630
631   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
632
633 #ifdef HAVE_TRACING
634   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
635 #endif
636
637   log_timed_action (action, clock);
638
639 }
640
641
642 static void action_gather(const char *const *action) {
643   /*
644  The structure of the gather action for the rank 0 (total 4 processes) 
645  is the following:   
646  0 gather 68 68 0 0 0
647
648   where: 
649   1) 68 is the sendcounts
650   2) 68 is the recvcounts
651   3) 0 is the root node
652   4) 0 is the send datatype id, see decode_datatype()
653   5) 0 is the recv datatype id, see decode_datatype()
654   */
655   double clock = smpi_process_simulated_elapsed();
656   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
657   int send_size = parse_double(action[2]);
658   int recv_size = parse_double(action[3]);
659   MPI_Datatype MPI_CURRENT_TYPE2;
660   if(action[5]) {
661     MPI_CURRENT_TYPE=decode_datatype(action[5]);
662     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
663   } else {
664     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
666   }
667   void *send = get_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
668   void *recv = NULL;
669
670   int root=atoi(action[4]);
671   int rank = smpi_process_index();
672
673   if(rank==root)
674     recv = get_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
675
676 #ifdef HAVE_TRACING
677   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
678   extra->type = TRACING_GATHER;
679   extra->send_size = send_size;
680   extra->recv_size = recv_size;
681   extra->root = root;
682   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
683   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
684
685   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
686 #endif
687   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
688                 recv, recv_size, MPI_CURRENT_TYPE2,
689                 root, MPI_COMM_WORLD);
690
691 #ifdef HAVE_TRACING
692   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
693 #endif
694
695   log_timed_action (action, clock);
696
697 }
698
699
700
701 static void action_gatherv(const char *const *action) {
702   /*
703  The structure of the gatherv action for the rank 0 (total 4 processes)
704  is the following:
705  0 gather 68 68 10 10 10 0 0 0
706
707   where:
708   1) 68 is the sendcount
709   2) 68 10 10 10 is the recvcounts
710   3) 0 is the root node
711   4) 0 is the send datatype id, see decode_datatype()
712   5) 0 is the recv datatype id, see decode_datatype()
713   */
714   double clock = smpi_process_simulated_elapsed();
715   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
716   int send_size = parse_double(action[2]);
717   int *disps = xbt_new0(int, comm_size);
718   int *recvcounts = xbt_new0(int, comm_size);
719   int i=0,recv_sum=0;
720
721   MPI_Datatype MPI_CURRENT_TYPE2;
722   if(action[4+comm_size]) {
723     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
724     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
725   } else {
726     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
727     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
728   }
729   void *send = get_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
730   void *recv = NULL;
731   for(i=0;i<comm_size;i++) {
732     recvcounts[i] = atoi(action[i+3]);
733     recv_sum=recv_sum+recvcounts[i];
734     disps[i] = 0;
735   }
736
737   int root=atoi(action[3+comm_size]);
738   int rank = smpi_process_index();
739
740   if(rank==root)
741     recv = get_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
742
743 #ifdef HAVE_TRACING
744   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
745   extra->type = TRACING_GATHERV;
746   extra->send_size = send_size;
747   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
748   for(i=0; i< comm_size; i++)//copy data to avoid bad free
749     extra->recvcounts[i] = recvcounts[i];
750   extra->root = root;
751   extra->num_processes = comm_size;
752   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
753   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
754
755   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
756 #endif
757 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
758                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
759                 root, MPI_COMM_WORLD);
760
761 #ifdef HAVE_TRACING
762   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
763 #endif
764
765   log_timed_action (action, clock);
766   xbt_free(recvcounts);
767   xbt_free(disps);
768
769 }
770
771 static void action_reducescatter(const char *const *action) {
772
773     /*
774  The structure of the reducescatter action for the rank 0 (total 4 processes) 
775  is the following:   
776 0 reduceScatter 275427 275427 275427 204020 11346849 0
777
778   where: 
779   1) The first four values after the name of the action declare the recvcounts array
780   2) The value 11346849 is the amount of instructions
781   3) The last value corresponds to the datatype, see decode_datatype().
782
783   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
784
785    */
786
787   double clock = smpi_process_simulated_elapsed();
788   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
789   int comp_size = parse_double(action[2+comm_size]);
790   int *recvcounts = xbt_new0(int, comm_size);  
791   int *disps = xbt_new0(int, comm_size);  
792   int i=0;
793   int rank = smpi_process_index();
794
795   if(action[3+comm_size])
796     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
797   else
798     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
799
800   for(i=0;i<comm_size;i++) {
801     recvcounts[i] = atoi(action[i+2]);
802     disps[i] = 0;
803   }
804
805 #ifdef HAVE_TRACING
806   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
807   extra->type = TRACING_REDUCE_SCATTER;
808   extra->send_size = 0;
809   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
810   for(i=0; i< comm_size; i++)//copy data to avoid bad free
811     extra->recvcounts[i] = recvcounts[i];
812   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
813   extra->comp_size = comp_size;
814   extra->num_processes = comm_size;
815
816
817   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
818 #endif
819    mpi_coll_reduce_scatter_fun(NULL, NULL, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
820        MPI_COMM_WORLD);
821    smpi_execute_flops(comp_size);
822
823
824 #ifdef HAVE_TRACING
825   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
826 #endif
827   xbt_free(recvcounts);
828   xbt_free(disps);
829   log_timed_action (action, clock);
830 }
831
832
833 static void action_allgatherv(const char *const *action) {
834
835   /*
836  The structure of the allgatherv action for the rank 0 (total 4 processes) 
837  is the following:   
838 0 allGatherV 275427 275427 275427 275427 204020
839
840   where: 
841   1) 275427 is the sendcount
842   2) The next four elements declare the recvcounts array
843   3) No more values mean that the datatype for sent and receive buffer
844   is the default one, see decode_datatype().
845
846    */
847
848   double clock = smpi_process_simulated_elapsed();
849
850   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
851   int i=0;
852   int sendcount=atoi(action[2]);
853   int *recvcounts = xbt_new0(int, comm_size);  
854   int *disps = xbt_new0(int, comm_size);  
855   int recv_sum=0;  
856   MPI_Datatype MPI_CURRENT_TYPE2;
857
858   if(action[3+comm_size]) {
859     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
860     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
861   } else {
862     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
863     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
864   }
865   void *sendbuf = get_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));    
866
867   for(i=0;i<comm_size;i++) {
868     recvcounts[i] = atoi(action[i+3]);
869     recv_sum=recv_sum+recvcounts[i];
870   }
871   void *recvbuf = get_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));  
872
873 #ifdef HAVE_TRACING
874   int rank = smpi_process_index();
875   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876   extra->type = TRACING_ALLGATHERV;
877   extra->send_size = sendcount;
878   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
879   for(i=0; i< comm_size; i++)//copy data to avoid bad free
880     extra->recvcounts[i] = recvcounts[i];
881   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
882   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
883   extra->num_processes = comm_size;
884
885   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
886 #endif
887
888   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
889
890 #ifdef HAVE_TRACING
891   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
892 #endif
893
894   log_timed_action (action, clock);
895   xbt_free(recvcounts);
896   xbt_free(disps);
897 }
898
899
900 static void action_allToAllv(const char *const *action) {
901   /*
902  The structure of the allToAllV action for the rank 0 (total 4 processes) 
903  is the following:   
904   0 allToAllV 100 1 7 10 12 100 1 70 10 5
905
906   where: 
907   1) 100 is the size of the send buffer *sizeof(int),
908   2) 1 7 10 12 is the sendcounts array
909   3) 100*sizeof(int) is the size of the receiver buffer
910   4)  1 70 10 5 is the recvcounts array
911
912    */
913
914
915   double clock = smpi_process_simulated_elapsed();
916
917   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
918   int send_buf_size=0,recv_buf_size=0,i=0;
919   int *sendcounts = xbt_new0(int, comm_size);  
920   int *recvcounts = xbt_new0(int, comm_size);  
921   int *senddisps = xbt_new0(int, comm_size);  
922   int *recvdisps = xbt_new0(int, comm_size);  
923
924   MPI_Datatype MPI_CURRENT_TYPE2;
925
926   send_buf_size=parse_double(action[2]);
927   recv_buf_size=parse_double(action[3+comm_size]);
928   if(action[4+2*comm_size]) {
929     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
930     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
931   }
932   else {
933       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
934       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
935   }
936
937   void *sendbuf = get_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));  
938   void *recvbuf  = get_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));  
939
940   for(i=0;i<comm_size;i++) {
941     sendcounts[i] = atoi(action[i+3]);
942     recvcounts[i] = atoi(action[i+4+comm_size]);
943   }
944
945
946 #ifdef HAVE_TRACING
947   int rank = smpi_process_index();
948   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
949   extra->type = TRACING_ALLTOALLV;
950   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
951   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
952   extra->num_processes = comm_size;
953
954   for(i=0; i< comm_size; i++){//copy data to avoid bad free
955     extra->send_size += sendcounts[i];
956     extra->sendcounts[i] = sendcounts[i];
957     extra->recv_size += recvcounts[i];
958     extra->recvcounts[i] = recvcounts[i];
959   }
960   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
961   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
962
963   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
964 #endif
965   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
966                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
967                                MPI_COMM_WORLD);
968 #ifdef HAVE_TRACING
969   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
970 #endif
971
972   log_timed_action (action, clock);
973   xbt_free(sendcounts);
974   xbt_free(recvcounts);
975   xbt_free(senddisps);
976   xbt_free(recvdisps);
977 }
978
979 void smpi_replay_init(int *argc, char***argv){
980   smpi_process_init(argc, argv);
981   smpi_process_mark_as_initialized();
982 #ifdef HAVE_TRACING
983   int rank = smpi_process_index();
984   TRACE_smpi_init(rank);
985   TRACE_smpi_computing_init(rank);
986   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
987   extra->type = TRACING_INIT;
988   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
989   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
990 #endif
991
992   if (!smpi_process_index()){
993     _xbt_replay_action_init();
994     xbt_replay_action_register("init",       action_init);
995     xbt_replay_action_register("finalize",   action_finalize);
996     xbt_replay_action_register("comm_size",  action_comm_size);
997     xbt_replay_action_register("comm_split", action_comm_split);
998     xbt_replay_action_register("comm_dup",   action_comm_dup);
999     xbt_replay_action_register("send",       action_send);
1000     xbt_replay_action_register("Isend",      action_Isend);
1001     xbt_replay_action_register("recv",       action_recv);
1002     xbt_replay_action_register("Irecv",      action_Irecv);
1003     xbt_replay_action_register("test",       action_test);
1004     xbt_replay_action_register("wait",       action_wait);
1005     xbt_replay_action_register("waitAll",    action_waitall);
1006     xbt_replay_action_register("barrier",    action_barrier);
1007     xbt_replay_action_register("bcast",      action_bcast);
1008     xbt_replay_action_register("reduce",     action_reduce);
1009     xbt_replay_action_register("allReduce",  action_allReduce);
1010     xbt_replay_action_register("allToAll",   action_allToAll);
1011     xbt_replay_action_register("allToAllV",  action_allToAllv);
1012     xbt_replay_action_register("gather",  action_gather);
1013     xbt_replay_action_register("gatherV",  action_gatherv);
1014     xbt_replay_action_register("allGatherV",  action_allgatherv);
1015     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1016     xbt_replay_action_register("compute",    action_compute);
1017   }
1018   
1019   //if we have a delayed start, sleep here.
1020   if(*argc>2){
1021     char *endptr;
1022     double value = strtod((*argv)[2], &endptr);
1023     if (*endptr != '\0')
1024       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1025     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1026     smpi_execute_flops(value);
1027   }
1028   xbt_replay_action_runner(*argc, *argv);
1029 }
1030
1031 int smpi_replay_finalize(){
1032   double sim_time= 1.;
1033   /* One active process will stop. Decrease the counter*/
1034   XBT_DEBUG("There are %lu elements in reqq[*]",
1035             xbt_dynar_length(reqq[smpi_process_index()]));
1036   if (!xbt_dynar_is_empty(reqq[smpi_process_index()])){
1037     int count_requests=xbt_dynar_length(reqq[smpi_process_index()]);
1038     MPI_Request requests[count_requests];
1039     MPI_Status status[count_requests];
1040     unsigned int i;
1041
1042     xbt_dynar_foreach(reqq[smpi_process_index()],i,requests[i]);
1043     smpi_mpi_waitall(count_requests, requests, status);
1044     active_processes--;
1045   } else {
1046     active_processes--;
1047   }
1048
1049   xbt_dynar_free_container(&(reqq[smpi_process_index()]));
1050
1051   if(!active_processes){
1052     /* Last process alive speaking */
1053     /* end the simulated timer */
1054     sim_time = smpi_process_simulated_elapsed();
1055     XBT_INFO("Simulation time %f", sim_time);
1056     _xbt_replay_action_exit();
1057     xbt_free(sendbuffer);
1058     xbt_free(recvbuffer);
1059     xbt_free(reqq);
1060     reqq = NULL;
1061   }
1062   mpi_coll_barrier_fun(MPI_COMM_WORLD);
1063 #ifdef HAVE_TRACING
1064   int rank = smpi_process_index();
1065   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1066   extra->type = TRACING_FINALIZE;
1067   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1068 #endif
1069   smpi_process_finalize();
1070 #ifdef HAVE_TRACING
1071   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1072   TRACE_smpi_finalize(smpi_process_index());
1073 #endif
1074   smpi_process_destroy();
1075   return MPI_SUCCESS;
1076 }