Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
model-checker : fix comm determinism detection mechanisms
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
13
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
17
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
20
21 static void log_timed_action (const char *const *action, double clock){
22   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23     char *name = xbt_str_join_array(action, " ");
24     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
25     free(name);
26   }
27 }
28
29 /* Helper function */
30 static double parse_double(const char *string)
31 {
32   double value;
33   char *endptr;
34   value = strtod(string, &endptr);
35   if (*endptr != '\0')
36     THROWF(unknown_error, 0, "%s is not a double", string);
37   return value;
38 }
39
40 static MPI_Datatype decode_datatype(const char *const action)
41 {
42 // Declared datatypes,
43
44   switch(atoi(action))
45   {
46     case 0:
47       MPI_CURRENT_TYPE=MPI_DOUBLE;
48       break;
49     case 1:
50       MPI_CURRENT_TYPE=MPI_INT;
51       break;
52     case 2:
53       MPI_CURRENT_TYPE=MPI_CHAR;
54       break;
55     case 3:
56       MPI_CURRENT_TYPE=MPI_SHORT;
57       break;
58     case 4:
59       MPI_CURRENT_TYPE=MPI_LONG;
60       break;
61     case 5:
62       MPI_CURRENT_TYPE=MPI_FLOAT;
63       break;
64     case 6:
65       MPI_CURRENT_TYPE=MPI_BYTE;
66       break;
67     default:
68       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
69
70   }
71    return MPI_CURRENT_TYPE;
72 }
73
74
75 const char* encode_datatype(MPI_Datatype datatype)
76 {
77
78   //default type for output is set to MPI_BYTE
79   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80   if (datatype==MPI_BYTE){
81       return "";
82   }
83   if(datatype==MPI_DOUBLE)
84       return "0";
85   if(datatype==MPI_INT)
86       return "1";
87   if(datatype==MPI_CHAR)
88       return "2";
89   if(datatype==MPI_SHORT)
90       return "3";
91   if(datatype==MPI_LONG)
92     return "4";
93   if(datatype==MPI_FLOAT)
94       return "5";
95
96   // default - not implemented.
97   // do not warn here as we pass in this function even for other trace formats
98   return "-1";
99 }
100
101 static void action_init(const char *const *action)
102 {
103   int i;
104   XBT_DEBUG("Initialize the counters");
105
106   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
107   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
108
109   /* start a simulated timer */
110   smpi_process_simulated_start();
111   /*initialize the number of active processes */
112   active_processes = smpi_process_count();
113
114   if (!reqq) {
115     reqq=xbt_new0(xbt_dynar_t,active_processes);
116
117     for(i=0;i<active_processes;i++){
118       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
119     }
120   }
121 }
122
123 static void action_finalize(const char *const *action)
124 {
125 }
126
127 static void action_comm_size(const char *const *action)
128 {
129   double clock = smpi_process_simulated_elapsed();
130
131   communicator_size = parse_double(action[2]);
132   log_timed_action (action, clock);
133 }
134
135 static void action_comm_split(const char *const *action)
136 {
137   double clock = smpi_process_simulated_elapsed();
138
139   log_timed_action (action, clock);
140 }
141
142 static void action_comm_dup(const char *const *action)
143 {
144   double clock = smpi_process_simulated_elapsed();
145
146   log_timed_action (action, clock);
147 }
148
149 static void action_compute(const char *const *action)
150 {
151   double clock = smpi_process_simulated_elapsed();
152   double flops= parse_double(action[2]);
153 #ifdef HAVE_TRACING
154   int rank = smpi_comm_rank(MPI_COMM_WORLD);
155   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156   extra->type=TRACING_COMPUTING;
157   extra->comp_size=flops;
158   TRACE_smpi_computing_in(rank, extra);
159 #endif
160   smpi_execute_flops(flops);
161 #ifdef HAVE_TRACING
162   TRACE_smpi_computing_out(rank);
163 #endif
164
165   log_timed_action (action, clock);
166 }
167
168 static void action_send(const char *const *action)
169 {
170   int to = atoi(action[2]);
171   double size=parse_double(action[3]);
172   double clock = smpi_process_simulated_elapsed();
173
174   if(action[4]) {
175     MPI_CURRENT_TYPE=decode_datatype(action[4]);
176   } else {
177     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
178   }
179
180 #ifdef HAVE_TRACING
181   int rank = smpi_comm_rank(MPI_COMM_WORLD);
182
183   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185   extra->type = TRACING_SEND;
186   extra->send_size = size;
187   extra->src = rank;
188   extra->dst = dst_traced;
189   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
192 #endif
193
194   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195
196   log_timed_action (action, clock);
197
198   #ifdef HAVE_TRACING
199   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
200 #endif
201
202 }
203
204 static void action_Isend(const char *const *action)
205 {
206   int to = atoi(action[2]);
207   double size=parse_double(action[3]);
208   double clock = smpi_process_simulated_elapsed();
209   MPI_Request request;
210
211   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
213
214 #ifdef HAVE_TRACING
215   int rank = smpi_comm_rank(MPI_COMM_WORLD);
216   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218   extra->type = TRACING_ISEND;
219   extra->send_size = size;
220   extra->src = rank;
221   extra->dst = dst_traced;
222   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
225 #endif
226
227   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
228
229 #ifdef HAVE_TRACING
230   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
231   request->send = 1;
232 #endif
233
234   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
235
236   log_timed_action (action, clock);
237 }
238
239 static void action_recv(const char *const *action) {
240   int from = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process_simulated_elapsed();
243   MPI_Status status;
244
245   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
247
248 #ifdef HAVE_TRACING
249   int rank = smpi_comm_rank(MPI_COMM_WORLD);
250   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
251
252   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253   extra->type = TRACING_RECV;
254   extra->send_size = size;
255   extra->src = src_traced;
256   extra->dst = rank;
257   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
259 #endif
260
261   //unknow size from the receiver pov
262   if(size==-1){
263       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
264       size=status.count;
265   }
266
267   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
268
269 #ifdef HAVE_TRACING
270   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271   TRACE_smpi_recv(rank, src_traced, rank);
272 #endif
273
274   log_timed_action (action, clock);
275 }
276
277 static void action_Irecv(const char *const *action)
278 {
279   int from = atoi(action[2]);
280   double size=parse_double(action[3]);
281   double clock = smpi_process_simulated_elapsed();
282   MPI_Request request;
283
284   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
286
287 #ifdef HAVE_TRACING
288   int rank = smpi_comm_rank(MPI_COMM_WORLD);
289   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291   extra->type = TRACING_IRECV;
292   extra->send_size = size;
293   extra->src = src_traced;
294   extra->dst = rank;
295   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
297 #endif
298   MPI_Status status;
299   //unknow size from the receiver pov
300   if(size==-1){
301       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
302       size=status.count;
303   }
304
305   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
306
307 #ifdef HAVE_TRACING
308   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
309   request->recv = 1;
310 #endif
311   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_test(const char *const *action){
317   double clock = smpi_process_simulated_elapsed();
318   MPI_Request request;
319   MPI_Status status;
320   int flag = TRUE;
321
322   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
323   xbt_assert(request != NULL, "found null request in reqq");
324
325 #ifdef HAVE_TRACING
326   int rank = smpi_comm_rank(MPI_COMM_WORLD);
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type=TRACING_TEST;
329   TRACE_smpi_testing_in(rank, extra);
330 #endif
331   flag = smpi_mpi_test(&request, &status);
332   XBT_DEBUG("MPI_Test result: %d", flag);
333   /* push back request in dynar to be caught by a subsequent wait. if the test
334    * did succeed, the request is now NULL.
335    */
336   xbt_dynar_push_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request, request);
337
338 #ifdef HAVE_TRACING
339   TRACE_smpi_testing_out(rank);
340 #endif
341
342   log_timed_action (action, clock);
343 }
344
345 static void action_wait(const char *const *action){
346   double clock = smpi_process_simulated_elapsed();
347   MPI_Request request;
348   MPI_Status status;
349
350   xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
351       "action wait not preceded by any irecv or isend: %s",
352       xbt_str_join_array(action," "));
353   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
354
355   if (!request){
356     /* Assuming that the trace is well formed, this mean the comm might have
357      * been caught by a MPI_test. Then just return.
358      */
359     return;
360   }
361
362 #ifdef HAVE_TRACING
363   int rank = request->comm != MPI_COMM_NULL
364       ? smpi_comm_rank(request->comm)
365       : -1;
366
367   MPI_Group group = smpi_comm_group(request->comm);
368   int src_traced = smpi_group_rank(group, request->src);
369   int dst_traced = smpi_group_rank(group, request->dst);
370   int is_wait_for_receive = request->recv;
371   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
372   extra->type = TRACING_WAIT;
373   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
374 #endif
375   smpi_mpi_wait(&request, &status);
376 #ifdef HAVE_TRACING
377   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
378   if (is_wait_for_receive) {
379     TRACE_smpi_recv(rank, src_traced, dst_traced);
380   }
381 #endif
382
383   log_timed_action (action, clock);
384 }
385
386 static void action_waitall(const char *const *action){
387   double clock = smpi_process_simulated_elapsed();
388   int count_requests=0;
389   unsigned int i=0;
390
391   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
392
393   if (count_requests>0) {
394     MPI_Request requests[count_requests];
395     MPI_Status status[count_requests];
396
397     /*  The reqq is an array of dynars. Its index corresponds to the rank.
398      Thus each rank saves its own requests to the array request. */
399     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
400
401   #ifdef HAVE_TRACING
402    //save information from requests
403
404    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
405    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
406    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
407    for (i = 0; i < count_requests; i++) {
408     if(requests[i]){
409       int *asrc = xbt_new(int, 1);
410       int *adst = xbt_new(int, 1);
411       int *arecv = xbt_new(int, 1);
412       *asrc = requests[i]->src;
413       *adst = requests[i]->dst;
414       *arecv = requests[i]->recv;
415       xbt_dynar_insert_at(srcs, i, asrc);
416       xbt_dynar_insert_at(dsts, i, adst);
417       xbt_dynar_insert_at(recvs, i, arecv);
418       xbt_free(asrc);
419       xbt_free(adst);
420       xbt_free(arecv);
421     }else {
422       int *t = xbt_new(int, 1);
423       xbt_dynar_insert_at(srcs, i, t);
424       xbt_dynar_insert_at(dsts, i, t);
425       xbt_dynar_insert_at(recvs, i, t);
426       xbt_free(t);
427     }
428    }
429    int rank_traced = smpi_process_index();
430    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
431    extra->type = TRACING_WAITALL;
432    extra->send_size=count_requests;
433    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
434  #endif
435
436     smpi_mpi_waitall(count_requests, requests, status);
437
438   #ifdef HAVE_TRACING
439    for (i = 0; i < count_requests; i++) {
440     int src_traced, dst_traced, is_wait_for_receive;
441     xbt_dynar_get_cpy(srcs, i, &src_traced);
442     xbt_dynar_get_cpy(dsts, i, &dst_traced);
443     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
444     if (is_wait_for_receive) {
445       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
446     }
447    }
448    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
449    //clean-up of dynars
450    xbt_dynar_free(&srcs);
451    xbt_dynar_free(&dsts);
452    xbt_dynar_free(&recvs);
453   #endif
454
455    xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
456   }
457   log_timed_action (action, clock);
458 }
459
460 static void action_barrier(const char *const *action){
461   double clock = smpi_process_simulated_elapsed();
462 #ifdef HAVE_TRACING
463   int rank = smpi_comm_rank(MPI_COMM_WORLD);
464   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465   extra->type = TRACING_BARRIER;
466   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
467 #endif
468   smpi_mpi_barrier(MPI_COMM_WORLD);
469 #ifdef HAVE_TRACING
470   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
471 #endif
472
473   log_timed_action (action, clock);
474 }
475
476
477 static void action_bcast(const char *const *action)
478 {
479   double size = parse_double(action[2]);
480   double clock = smpi_process_simulated_elapsed();
481   int root=0;
482   /*
483    * Initialize MPI_CURRENT_TYPE in order to decrease
484    * the number of the checks
485    * */
486   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
487
488   if(action[3]) {
489     root= atoi(action[3]);
490     if(action[4]) {
491       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
492     }
493   }
494
495 #ifdef HAVE_TRACING
496   int rank = smpi_comm_rank(MPI_COMM_WORLD);
497   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
498
499   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500   extra->type = TRACING_BCAST;
501   extra->send_size = size;
502   extra->root = root_traced;
503   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
505
506 #endif
507
508   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
509 #ifdef HAVE_TRACING
510   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
511 #endif
512
513   log_timed_action (action, clock);
514 }
515
516 static void action_reduce(const char *const *action)
517 {
518   double comm_size = parse_double(action[2]);
519   double comp_size = parse_double(action[3]);
520   double clock = smpi_process_simulated_elapsed();
521   int root=0;
522   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523
524   if(action[4]) {
525     root= atoi(action[4]);
526     if(action[5]) {
527       MPI_CURRENT_TYPE=decode_datatype(action[5]);
528     }
529   }
530
531 #ifdef HAVE_TRACING
532   int rank = smpi_comm_rank(MPI_COMM_WORLD);
533   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
534   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
535   extra->type = TRACING_REDUCE;
536   extra->send_size = comm_size;
537   extra->comp_size = comp_size;
538   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
539   extra->root = root_traced;
540
541   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
542 #endif
543    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
544    smpi_execute_flops(comp_size);
545 #ifdef HAVE_TRACING
546   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
547 #endif
548
549   log_timed_action (action, clock);
550 }
551
552 static void action_allReduce(const char *const *action) {
553   double comm_size = parse_double(action[2]);
554   double comp_size = parse_double(action[3]);
555
556   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
557   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
558
559   double clock = smpi_process_simulated_elapsed();
560 #ifdef HAVE_TRACING
561   int rank = smpi_comm_rank(MPI_COMM_WORLD);
562   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
563   extra->type = TRACING_ALLREDUCE;
564   extra->send_size = comm_size;
565   extra->comp_size = comp_size;
566   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
567
568   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
569 #endif
570   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
571   smpi_execute_flops(comp_size);
572   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
573 #ifdef HAVE_TRACING
574   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
575 #endif
576
577   log_timed_action (action, clock);
578 }
579
580 static void action_allToAll(const char *const *action) {
581   double clock = smpi_process_simulated_elapsed();
582   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
583   int send_size = parse_double(action[2]);
584   int recv_size = parse_double(action[3]);
585   MPI_Datatype MPI_CURRENT_TYPE2;
586
587   if(action[4]) {
588     MPI_CURRENT_TYPE=decode_datatype(action[4]);
589     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
590   }
591   else {
592     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
593     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
594   }
595   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
596   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
597
598 #ifdef HAVE_TRACING
599   int rank = smpi_process_index();
600   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
601   extra->type = TRACING_ALLTOALL;
602   extra->send_size = send_size;
603   extra->recv_size = recv_size;
604   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
605   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
606
607   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
608 #endif
609
610   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
611
612 #ifdef HAVE_TRACING
613   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
614 #endif
615
616   log_timed_action (action, clock);
617   xbt_free(send);
618   xbt_free(recv);
619 }
620
621
622 static void action_gather(const char *const *action) {
623   /*
624  The structure of the gather action for the rank 0 (total 4 processes) 
625  is the following:   
626  0 gather 68 68 0 0 0
627
628   where: 
629   1) 68 is the sendcounts
630   2) 68 is the recvcounts
631   3) 0 is the root node
632   4) 0 is the send datatype id, see decode_datatype()
633   5) 0 is the recv datatype id, see decode_datatype()
634   */
635   double clock = smpi_process_simulated_elapsed();
636   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
637   int send_size = parse_double(action[2]);
638   int recv_size = parse_double(action[3]);
639   MPI_Datatype MPI_CURRENT_TYPE2;
640   if(action[5]) {
641     MPI_CURRENT_TYPE=decode_datatype(action[5]);
642     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
643   } else {
644     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
645     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
646   }
647   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
648   void *recv = NULL;
649
650   int root=atoi(action[4]);
651   int rank = smpi_process_index();
652
653   if(rank==root)
654     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
655
656 #ifdef HAVE_TRACING
657   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
658   extra->type = TRACING_GATHER;
659   extra->send_size = send_size;
660   extra->recv_size = recv_size;
661   extra->root = root;
662   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
663   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
664
665   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
666 #endif
667 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
668                 recv, recv_size, MPI_CURRENT_TYPE2,
669                 root, MPI_COMM_WORLD);
670
671 #ifdef HAVE_TRACING
672   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
673 #endif
674
675   log_timed_action (action, clock);
676   xbt_free(send);
677   xbt_free(recv);
678 }
679
680
681
682 static void action_gatherv(const char *const *action) {
683   /*
684  The structure of the gatherv action for the rank 0 (total 4 processes)
685  is the following:
686  0 gather 68 68 10 10 10 0 0 0
687
688   where:
689   1) 68 is the sendcount
690   2) 68 10 10 10 is the recvcounts
691   3) 0 is the root node
692   4) 0 is the send datatype id, see decode_datatype()
693   5) 0 is the recv datatype id, see decode_datatype()
694   */
695   double clock = smpi_process_simulated_elapsed();
696   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
697   int send_size = parse_double(action[2]);
698   int *disps = xbt_new0(int, comm_size);
699   int *recvcounts = xbt_new0(int, comm_size);
700   int i=0,recv_sum=0;
701
702   MPI_Datatype MPI_CURRENT_TYPE2;
703   if(action[4+comm_size]) {
704     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
705     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
706   } else {
707     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
708     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
709   }
710   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
711   void *recv = NULL;
712   for(i=0;i<comm_size;i++) {
713     recvcounts[i] = atoi(action[i+3]);
714     recv_sum=recv_sum+recvcounts[i];
715     disps[i] = 0;
716   }
717
718   int root=atoi(action[3+comm_size]);
719   int rank = smpi_process_index();
720
721   if(rank==root)
722     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
723
724 #ifdef HAVE_TRACING
725   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
726   extra->type = TRACING_GATHERV;
727   extra->send_size = send_size;
728   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
729   for(i=0; i< comm_size; i++)//copy data to avoid bad free
730     extra->recvcounts[i] = recvcounts[i];
731   extra->root = root;
732   extra->num_processes = comm_size;
733   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
734   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
735
736   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
737 #endif
738 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
739                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
740                 root, MPI_COMM_WORLD);
741
742 #ifdef HAVE_TRACING
743   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
744 #endif
745
746   log_timed_action (action, clock);
747   xbt_free(recvcounts);
748   xbt_free(send);
749   xbt_free(recv);
750   xbt_free(disps);
751
752 }
753
754 static void action_reducescatter(const char *const *action) {
755
756     /*
757  The structure of the reducescatter action for the rank 0 (total 4 processes) 
758  is the following:   
759 0 reduceScatter 275427 275427 275427 204020 11346849 0
760
761   where: 
762   1) The first four values after the name of the action declare the recvcounts array
763   2) The value 11346849 is the amount of instructions
764   3) The last value corresponds to the datatype, see decode_datatype().
765
766   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
767
768    */
769
770   double clock = smpi_process_simulated_elapsed();
771   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
772   int comp_size = parse_double(action[2+comm_size]);
773   int *recvcounts = xbt_new0(int, comm_size);  
774   int *disps = xbt_new0(int, comm_size);  
775   int i=0,recv_sum=0;
776   int root=0;
777   int rank = smpi_process_index();
778
779   if(action[3+comm_size])
780     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
781   else
782     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
783
784   for(i=0;i<comm_size;i++) {
785     recvcounts[i] = atoi(action[i+2]);
786     recv_sum=recv_sum+recvcounts[i];
787     disps[i] = 0;
788   }
789
790 #ifdef HAVE_TRACING
791   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
792   extra->type = TRACING_REDUCE_SCATTER;
793   extra->send_size = 0;
794   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
795   for(i=0; i< comm_size; i++)//copy data to avoid bad free
796     extra->recvcounts[i] = recvcounts[i];
797   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
798   extra->comp_size = comp_size;
799   extra->num_processes = comm_size;
800
801
802   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
803 #endif
804    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
805        root, MPI_COMM_WORLD);
806    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
807                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
808    smpi_execute_flops(comp_size);
809
810
811 #ifdef HAVE_TRACING
812   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
813 #endif
814   xbt_free(recvcounts);
815   xbt_free(disps);
816   log_timed_action (action, clock);
817 }
818
819
820 static void action_allgatherv(const char *const *action) {
821
822   /*
823  The structure of the allgatherv action for the rank 0 (total 4 processes) 
824  is the following:   
825 0 allGatherV 275427 275427 275427 275427 204020
826
827   where: 
828   1) 275427 is the sendcount
829   2) The next four elements declare the recvcounts array
830   3) No more values mean that the datatype for sent and receive buffer
831   is the default one, see decode_datatype().
832
833    */
834
835   double clock = smpi_process_simulated_elapsed();
836
837   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
838   int i=0;
839   int sendcount=atoi(action[2]);
840   int *recvcounts = xbt_new0(int, comm_size);  
841   int *disps = xbt_new0(int, comm_size);  
842   int recv_sum=0;  
843   MPI_Datatype MPI_CURRENT_TYPE2;
844
845   if(action[3+comm_size]) {
846     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
847     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
848   } else {
849     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
850     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
851   }
852   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
853
854   for(i=0;i<comm_size;i++) {
855     recvcounts[i] = atoi(action[i+3]);
856     recv_sum=recv_sum+recvcounts[i];
857   }
858   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
859
860 #ifdef HAVE_TRACING
861   int rank = smpi_process_index();
862   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
863   extra->type = TRACING_ALLGATHERV;
864   extra->send_size = sendcount;
865   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
866   for(i=0; i< comm_size; i++)//copy data to avoid bad free
867     extra->recvcounts[i] = recvcounts[i];
868   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
869   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
870   extra->num_processes = comm_size;
871
872   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
873 #endif
874
875 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
876
877 #ifdef HAVE_TRACING
878   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
879 #endif
880
881   log_timed_action (action, clock);
882   xbt_free(sendbuf);
883   xbt_free(recvbuf);
884   xbt_free(recvcounts);
885   xbt_free(disps);
886 }
887
888
889 static void action_allToAllv(const char *const *action) {
890   /*
891  The structure of the allToAllV action for the rank 0 (total 4 processes) 
892  is the following:   
893   0 allToAllV 100 1 7 10 12 100 1 70 10 5
894
895   where: 
896   1) 100 is the size of the send buffer *sizeof(int),
897   2) 1 7 10 12 is the sendcounts array
898   3) 100*sizeof(int) is the size of the receiver buffer
899   4)  1 70 10 5 is the recvcounts array
900
901    */
902
903
904   double clock = smpi_process_simulated_elapsed();
905
906   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
907   int send_buf_size=0,recv_buf_size=0,i=0;
908   int *sendcounts = xbt_new0(int, comm_size);  
909   int *recvcounts = xbt_new0(int, comm_size);  
910   int *senddisps = xbt_new0(int, comm_size);  
911   int *recvdisps = xbt_new0(int, comm_size);  
912
913   MPI_Datatype MPI_CURRENT_TYPE2;
914
915   send_buf_size=parse_double(action[2]);
916   recv_buf_size=parse_double(action[3+comm_size]);
917   if(action[4+2*comm_size]) {
918     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
919     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
920   }
921   else {
922       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
923       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
924   }
925
926   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
927   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
928
929   for(i=0;i<comm_size;i++) {
930     sendcounts[i] = atoi(action[i+3]);
931     recvcounts[i] = atoi(action[i+4+comm_size]);
932   }
933
934
935 #ifdef HAVE_TRACING
936   int rank = smpi_process_index();
937   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
938   extra->type = TRACING_ALLTOALLV;
939   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
940   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
941   extra->num_processes = comm_size;
942
943   for(i=0; i< comm_size; i++){//copy data to avoid bad free
944     extra->send_size += sendcounts[i];
945     extra->sendcounts[i] = sendcounts[i];
946     extra->recv_size += recvcounts[i];
947     extra->recvcounts[i] = recvcounts[i];
948   }
949   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
950   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
951
952   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
953 #endif
954     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
955                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
956                                MPI_COMM_WORLD);
957 #ifdef HAVE_TRACING
958   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
959 #endif
960
961   log_timed_action (action, clock);
962   xbt_free(sendbuf);
963   xbt_free(recvbuf);
964   xbt_free(sendcounts);
965   xbt_free(recvcounts);
966   xbt_free(senddisps);
967   xbt_free(recvdisps);
968 }
969
970 void smpi_replay_init(int *argc, char***argv){
971   smpi_process_init(argc, argv);
972   smpi_process_mark_as_initialized();
973 #ifdef HAVE_TRACING
974   int rank = smpi_process_index();
975   TRACE_smpi_init(rank);
976   TRACE_smpi_computing_init(rank);
977   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
978   extra->type = TRACING_INIT;
979   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
980   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
981 #endif
982
983   if (!smpi_process_index()){
984     _xbt_replay_action_init();
985     xbt_replay_action_register("init",       action_init);
986     xbt_replay_action_register("finalize",   action_finalize);
987     xbt_replay_action_register("comm_size",  action_comm_size);
988     xbt_replay_action_register("comm_split", action_comm_split);
989     xbt_replay_action_register("comm_dup",   action_comm_dup);
990     xbt_replay_action_register("send",       action_send);
991     xbt_replay_action_register("Isend",      action_Isend);
992     xbt_replay_action_register("recv",       action_recv);
993     xbt_replay_action_register("Irecv",      action_Irecv);
994     xbt_replay_action_register("test",       action_test);
995     xbt_replay_action_register("wait",       action_wait);
996     xbt_replay_action_register("waitAll",    action_waitall);
997     xbt_replay_action_register("barrier",    action_barrier);
998     xbt_replay_action_register("bcast",      action_bcast);
999     xbt_replay_action_register("reduce",     action_reduce);
1000     xbt_replay_action_register("allReduce",  action_allReduce);
1001     xbt_replay_action_register("allToAll",   action_allToAll);
1002     xbt_replay_action_register("allToAllV",  action_allToAllv);
1003     xbt_replay_action_register("gather",  action_gather);
1004     xbt_replay_action_register("gatherV",  action_gatherv);
1005     xbt_replay_action_register("allGatherV",  action_allgatherv);
1006     xbt_replay_action_register("reduceScatter",  action_reducescatter);
1007     xbt_replay_action_register("compute",    action_compute);
1008   }
1009
1010   xbt_replay_action_runner(*argc, *argv);
1011 }
1012
1013 int smpi_replay_finalize(){
1014   double sim_time= 1.;
1015   /* One active process will stop. Decrease the counter*/
1016   XBT_DEBUG("There are %lu elements in reqq[*]",
1017             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1018   if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
1019     int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
1020     MPI_Request requests[count_requests];
1021     MPI_Status status[count_requests];
1022     unsigned int i;
1023
1024     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
1025     smpi_mpi_waitall(count_requests, requests, status);
1026     active_processes--;
1027   } else {
1028     active_processes--;
1029   }
1030
1031   xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
1032
1033   if(!active_processes){
1034     /* Last process alive speaking */
1035     /* end the simulated timer */
1036     sim_time = smpi_process_simulated_elapsed();
1037     XBT_INFO("Simulation time %f", sim_time);
1038     _xbt_replay_action_exit();
1039     xbt_free(reqq);
1040     reqq = NULL;
1041   }
1042   smpi_mpi_barrier(MPI_COMM_WORLD);
1043 #ifdef HAVE_TRACING
1044   int rank = smpi_process_index();
1045   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1046   extra->type = TRACING_FINALIZE;
1047   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1048 #endif
1049   smpi_process_finalize();
1050 #ifdef HAVE_TRACING
1051   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1052   TRACE_smpi_finalize(smpi_process_index());
1053 #endif
1054   smpi_process_destroy();
1055   return MPI_SUCCESS;
1056 }