Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
attempt to get rid of all const_cast (take 2)
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/smpi/private.h"
7 #include "src/smpi/smpi_coll.hpp"
8 #include "src/smpi/smpi_comm.hpp"
9 #include "src/smpi/smpi_datatype.hpp"
10 #include "src/smpi/smpi_group.hpp"
11 #include "src/smpi/smpi_process.hpp"
12 #include "src/smpi/smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (!smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (!smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (!smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90 static MPI_Datatype decode_datatype(const char *const action)
91 {
92   switch(atoi(action)) {
93     case 0:
94       MPI_CURRENT_TYPE=MPI_DOUBLE;
95       break;
96     case 1:
97       MPI_CURRENT_TYPE=MPI_INT;
98       break;
99     case 2:
100       MPI_CURRENT_TYPE=MPI_CHAR;
101       break;
102     case 3:
103       MPI_CURRENT_TYPE=MPI_SHORT;
104       break;
105     case 4:
106       MPI_CURRENT_TYPE=MPI_LONG;
107       break;
108     case 5:
109       MPI_CURRENT_TYPE=MPI_FLOAT;
110       break;
111     case 6:
112       MPI_CURRENT_TYPE=MPI_BYTE;
113       break;
114     default:
115       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116   }
117    return MPI_CURRENT_TYPE;
118 }
119
120 const char* encode_datatype(MPI_Datatype datatype, int* known)
121 {
122   //default type for output is set to MPI_BYTE
123   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
124   if(known!=nullptr)
125     *known=1;
126   if (datatype==MPI_BYTE)
127       return "";
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
141   if(known!=nullptr)
142     *known=0;
143   // default - not implemented.
144   // do not warn here as we pass in this function even for other trace formats
145   return "-1";
146 }
147
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149     int i=0;\
150     while(action[i]!=nullptr)\
151      i++;\
152     if(i<mandatory+2)                                           \
153     THROWF(arg_error, 0, "%s replay failed.\n" \
154           "%d items were given on the line. First two should be process_id and action.  " \
155           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157   }
158
159 namespace simgrid {
160 namespace smpi {
161
162 static void action_init(const char *const *action)
163 {
164   XBT_DEBUG("Initialize the counters");
165   CHECK_ACTION_PARAMS(action, 0, 1)
166   if(action[2]) 
167     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
168   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
169
170   /* start a simulated timer */
171   smpi_process()->simulated_start();
172   /*initialize the number of active processes */
173   active_processes = smpi_process_count();
174
175   set_reqq_self(new std::vector<MPI_Request>);
176 }
177
178 static void action_finalize(const char *const *action)
179 {
180   /* Nothing to do */
181 }
182
183 static void action_comm_size(const char *const *action)
184 {
185   communicator_size = parse_double(action[2]);
186   log_timed_action (action, smpi_process()->simulated_elapsed());
187 }
188
189 static void action_comm_split(const char *const *action)
190 {
191   log_timed_action (action, smpi_process()->simulated_elapsed());
192 }
193
194 static void action_comm_dup(const char *const *action)
195 {
196   log_timed_action (action, smpi_process()->simulated_elapsed());
197 }
198
199 static void action_compute(const char *const *action)
200 {
201   CHECK_ACTION_PARAMS(action, 1, 0)
202   double clock = smpi_process()->simulated_elapsed();
203   double flops= parse_double(action[2]);
204   int rank = smpi_process()->index();
205   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
206   extra->type=TRACING_COMPUTING;
207   extra->comp_size=flops;
208   TRACE_smpi_computing_in(rank, extra);
209
210   smpi_execute_flops(flops);
211
212   TRACE_smpi_computing_out(rank);
213   log_timed_action (action, clock);
214 }
215
216 static void action_send(const char *const *action)
217 {
218   CHECK_ACTION_PARAMS(action, 2, 1)
219   int to = atoi(action[2]);
220   double size=parse_double(action[3]);
221   double clock = smpi_process()->simulated_elapsed();
222
223   if(action[4])
224     MPI_CURRENT_TYPE=decode_datatype(action[4]);
225   else
226     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
227
228   int rank = smpi_process()->index();
229
230   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
231   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
232   extra->type = TRACING_SEND;
233   extra->send_size = size;
234   extra->src = rank;
235   extra->dst = dst_traced;
236   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
237   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
238   if (!TRACE_smpi_view_internals())
239     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
240
241   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
242
243   log_timed_action (action, clock);
244
245   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
246 }
247
248 static void action_Isend(const char *const *action)
249 {
250   CHECK_ACTION_PARAMS(action, 2, 1)
251   int to = atoi(action[2]);
252   double size=parse_double(action[3]);
253   double clock = smpi_process()->simulated_elapsed();
254
255   if(action[4]) 
256     MPI_CURRENT_TYPE=decode_datatype(action[4]);
257   else 
258     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
259
260   int rank = smpi_process()->index();
261   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
262   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
263   extra->type = TRACING_ISEND;
264   extra->send_size = size;
265   extra->src = rank;
266   extra->dst = dst_traced;
267   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
268   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
269   if (!TRACE_smpi_view_internals())
270     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
271
272   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
273
274   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
275
276   get_reqq_self()->push_back(request);
277
278   log_timed_action (action, clock);
279 }
280
281 static void action_recv(const char *const *action) {
282   CHECK_ACTION_PARAMS(action, 2, 1)
283   int from = atoi(action[2]);
284   double size=parse_double(action[3]);
285   double clock = smpi_process()->simulated_elapsed();
286   MPI_Status status;
287
288   if(action[4]) 
289     MPI_CURRENT_TYPE=decode_datatype(action[4]);
290   else 
291     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
292
293   int rank = smpi_process()->index();
294   int src_traced = MPI_COMM_WORLD->group()->rank(from);
295
296   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
297   extra->type = TRACING_RECV;
298   extra->send_size = size;
299   extra->src = src_traced;
300   extra->dst = rank;
301   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
302   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
303
304   //unknown size from the receiver point of view
305   if(size<=0.0){
306     Request::probe(from, 0, MPI_COMM_WORLD, &status);
307     size=status.count;
308   }
309
310   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
311
312   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
313   if (!TRACE_smpi_view_internals()) {
314     TRACE_smpi_recv(rank, src_traced, rank, 0);
315   }
316
317   log_timed_action (action, clock);
318 }
319
320 static void action_Irecv(const char *const *action)
321 {
322   CHECK_ACTION_PARAMS(action, 2, 1)
323   int from = atoi(action[2]);
324   double size=parse_double(action[3]);
325   double clock = smpi_process()->simulated_elapsed();
326
327   if(action[4]) 
328     MPI_CURRENT_TYPE=decode_datatype(action[4]);
329   else 
330     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
331
332   int rank = smpi_process()->index();
333   int src_traced = MPI_COMM_WORLD->group()->rank(from);
334   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
335   extra->type = TRACING_IRECV;
336   extra->send_size = size;
337   extra->src = src_traced;
338   extra->dst = rank;
339   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
340   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
341   MPI_Status status;
342   //unknow size from the receiver pov
343   if(size<=0.0){
344       Request::probe(from, 0, MPI_COMM_WORLD, &status);
345       size=status.count;
346   }
347
348   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
349
350   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
351   get_reqq_self()->push_back(request);
352
353   log_timed_action (action, clock);
354 }
355
356 static void action_test(const char *const *action){
357   CHECK_ACTION_PARAMS(action, 0, 0)
358   double clock = smpi_process()->simulated_elapsed();
359   MPI_Status status;
360
361   MPI_Request request = get_reqq_self()->back();
362   get_reqq_self()->pop_back();
363   //if request is null here, this may mean that a previous test has succeeded 
364   //Different times in traced application and replayed version may lead to this 
365   //In this case, ignore the extra calls.
366   if(request!=nullptr){
367     int rank = smpi_process()->index();
368     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
369     extra->type=TRACING_TEST;
370     TRACE_smpi_testing_in(rank, extra);
371
372     int flag = Request::test(&request, &status);
373
374     XBT_DEBUG("MPI_Test result: %d", flag);
375     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
376     get_reqq_self()->push_back(request);
377
378     TRACE_smpi_testing_out(rank);
379   }
380   log_timed_action (action, clock);
381 }
382
383 static void action_wait(const char *const *action){
384   CHECK_ACTION_PARAMS(action, 0, 0)
385   double clock = smpi_process()->simulated_elapsed();
386   MPI_Status status;
387
388   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
389       xbt_str_join_array(action," "));
390   MPI_Request request = get_reqq_self()->back();
391   get_reqq_self()->pop_back();
392
393   if (request==nullptr){
394     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
395     return;
396   }
397
398   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
399
400   MPI_Group group = request->comm()->group();
401   int src_traced = group->rank(request->src());
402   int dst_traced = group->rank(request->dst());
403   int is_wait_for_receive = (request->flags() & RECV);
404   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
405   extra->type = TRACING_WAIT;
406   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
407
408   Request::wait(&request, &status);
409
410   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
411   if (is_wait_for_receive)
412     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
413   log_timed_action (action, clock);
414 }
415
416 static void action_waitall(const char *const *action){
417   CHECK_ACTION_PARAMS(action, 0, 0)
418   double clock = smpi_process()->simulated_elapsed();
419   unsigned int count_requests=get_reqq_self()->size();
420
421   if (count_requests>0) {
422     MPI_Status status[count_requests];
423
424    int rank_traced = smpi_process()->index();
425    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
426    extra->type = TRACING_WAITALL;
427    extra->send_size=count_requests;
428    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
429    int recvs_snd[count_requests];
430    int recvs_rcv[count_requests];
431    unsigned int i=0;
432    for (auto req : *(get_reqq_self())){
433      if (req && (req->flags () & RECV)){
434        recvs_snd[i]=req->src();
435        recvs_rcv[i]=req->dst();
436      }else
437        recvs_snd[i]=-100;
438      i++;
439    }
440    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
441
442    for (i=0; i<count_requests;i++){
443      if (recvs_snd[i]!=-100)
444        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
445    }
446    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
447   }
448   log_timed_action (action, clock);
449 }
450
451 static void action_barrier(const char *const *action){
452   double clock = smpi_process()->simulated_elapsed();
453   int rank = smpi_process()->index();
454   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455   extra->type = TRACING_BARRIER;
456   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
457
458   Colls::barrier(MPI_COMM_WORLD);
459
460   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
461   log_timed_action (action, clock);
462 }
463
464 static void action_bcast(const char *const *action)
465 {
466   CHECK_ACTION_PARAMS(action, 1, 2)
467   double size = parse_double(action[2]);
468   double clock = smpi_process()->simulated_elapsed();
469   int root=0;
470   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
472
473   if(action[3]) {
474     root= atoi(action[3]);
475     if(action[4])
476       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
477   }
478
479   int rank = smpi_process()->index();
480   int root_traced = MPI_COMM_WORLD->group()->index(root);
481
482   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
483   extra->type = TRACING_BCAST;
484   extra->send_size = size;
485   extra->root = root_traced;
486   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
487   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
488   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
489
490   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
491
492   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
493   log_timed_action (action, clock);
494 }
495
496 static void action_reduce(const char *const *action)
497 {
498   CHECK_ACTION_PARAMS(action, 2, 2)
499   double comm_size = parse_double(action[2]);
500   double comp_size = parse_double(action[3]);
501   double clock = smpi_process()->simulated_elapsed();
502   int root=0;
503   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
504
505   if(action[4]) {
506     root= atoi(action[4]);
507     if(action[5])
508       MPI_CURRENT_TYPE=decode_datatype(action[5]);
509   }
510
511   int rank = smpi_process()->index();
512   int root_traced = MPI_COMM_WORLD->group()->rank(root);
513   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
514   extra->type = TRACING_REDUCE;
515   extra->send_size = comm_size;
516   extra->comp_size = comp_size;
517   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
518   extra->root = root_traced;
519
520   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
521
522   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
525   smpi_execute_flops(comp_size);
526
527   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
528   log_timed_action (action, clock);
529 }
530
531 static void action_allReduce(const char *const *action) {
532   CHECK_ACTION_PARAMS(action, 2, 1)
533   double comm_size = parse_double(action[2]);
534   double comp_size = parse_double(action[3]);
535
536   if(action[4])
537     MPI_CURRENT_TYPE=decode_datatype(action[4]);
538   else
539     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
540
541   double clock = smpi_process()->simulated_elapsed();
542   int rank = smpi_process()->index();
543   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
544   extra->type = TRACING_ALLREDUCE;
545   extra->send_size = comm_size;
546   extra->comp_size = comp_size;
547   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
548   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
549
550   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
553   smpi_execute_flops(comp_size);
554
555   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
556   log_timed_action (action, clock);
557 }
558
559 static void action_allToAll(const char *const *action) {
560   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
561   double clock = smpi_process()->simulated_elapsed();
562   int comm_size = MPI_COMM_WORLD->size();
563   int send_size = parse_double(action[2]);
564   int recv_size = parse_double(action[3]);
565   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
566
567   if(action[4] && action[5]) {
568     MPI_CURRENT_TYPE=decode_datatype(action[4]);
569     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
570   }
571   else
572     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
573
574   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
575   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
576
577   int rank = smpi_process()->index();
578   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
579   extra->type = TRACING_ALLTOALL;
580   extra->send_size = send_size;
581   extra->recv_size = recv_size;
582   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
583   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
584
585   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
586
587   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
588
589   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
590   log_timed_action (action, clock);
591 }
592
593 static void action_gather(const char *const *action) {
594   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
595         0 gather 68 68 0 0 0
596       where:
597         1) 68 is the sendcounts
598         2) 68 is the recvcounts
599         3) 0 is the root node
600         4) 0 is the send datatype id, see decode_datatype()
601         5) 0 is the recv datatype id, see decode_datatype()
602   */
603   CHECK_ACTION_PARAMS(action, 2, 3)
604   double clock = smpi_process()->simulated_elapsed();
605   int comm_size = MPI_COMM_WORLD->size();
606   int send_size = parse_double(action[2]);
607   int recv_size = parse_double(action[3]);
608   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
609   if(action[4] && action[5]) {
610     MPI_CURRENT_TYPE=decode_datatype(action[5]);
611     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
612   } else {
613     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614   }
615   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
616   void *recv = nullptr;
617   int root=0;
618   if(action[4])
619     root=atoi(action[4]);
620   int rank = MPI_COMM_WORLD->rank();
621
622   if(rank==root)
623     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
624
625   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
626   extra->type = TRACING_GATHER;
627   extra->send_size = send_size;
628   extra->recv_size = recv_size;
629   extra->root = root;
630   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
631   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
632
633   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
634
635   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
636
637   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
638   log_timed_action (action, clock);
639 }
640
641 static void action_gatherv(const char *const *action) {
642   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
643        0 gather 68 68 10 10 10 0 0 0
644      where:
645        1) 68 is the sendcount
646        2) 68 10 10 10 is the recvcounts
647        3) 0 is the root node
648        4) 0 is the send datatype id, see decode_datatype()
649        5) 0 is the recv datatype id, see decode_datatype()
650   */
651   double clock = smpi_process()->simulated_elapsed();
652   int comm_size = MPI_COMM_WORLD->size();
653   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
654   int send_size = parse_double(action[2]);
655   int disps[comm_size];
656   int recvcounts[comm_size];
657   int recv_sum=0;
658
659   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
660   if(action[4+comm_size] && action[5+comm_size]) {
661     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
662     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
663   } else
664     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665
666   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
667   void *recv = nullptr;
668   for(int i=0;i<comm_size;i++) {
669     recvcounts[i] = atoi(action[i+3]);
670     recv_sum=recv_sum+recvcounts[i];
671     disps[i]=0;
672   }
673
674   int root=atoi(action[3+comm_size]);
675   int rank = MPI_COMM_WORLD->rank();
676
677   if(rank==root)
678     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
679
680   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
681   extra->type = TRACING_GATHERV;
682   extra->send_size = send_size;
683   extra->recvcounts= xbt_new(int,comm_size);
684   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
685     extra->recvcounts[i] = recvcounts[i];
686   extra->root = root;
687   extra->num_processes = comm_size;
688   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
689   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
690
691   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
692
693   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
694
695   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
696   log_timed_action (action, clock);
697 }
698
699 static void action_reducescatter(const char *const *action) {
700  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
701       0 reduceScatter 275427 275427 275427 204020 11346849 0
702     where:
703       1) The first four values after the name of the action declare the recvcounts array
704       2) The value 11346849 is the amount of instructions
705       3) The last value corresponds to the datatype, see decode_datatype().
706 */
707   double clock = smpi_process()->simulated_elapsed();
708   int comm_size = MPI_COMM_WORLD->size();
709   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
710   int comp_size = parse_double(action[2+comm_size]);
711   int recvcounts[comm_size];
712   int rank = smpi_process()->index();
713   int size = 0;
714   if(action[3+comm_size])
715     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
716   else
717     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
718
719   for(int i=0;i<comm_size;i++) {
720     recvcounts[i] = atoi(action[i+2]);
721     size+=recvcounts[i];
722   }
723
724   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725   extra->type = TRACING_REDUCE_SCATTER;
726   extra->send_size = 0;
727   extra->recvcounts= xbt_new(int, comm_size);
728   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
729     extra->recvcounts[i] = recvcounts[i];
730   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
731   extra->comp_size = comp_size;
732   extra->num_processes = comm_size;
733
734   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
735
736   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
737   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
738
739   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
740   smpi_execute_flops(comp_size);
741
742   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
743   log_timed_action (action, clock);
744 }
745
746 static void action_allgather(const char *const *action) {
747   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
748         0 allGather 275427 275427
749     where:
750         1) 275427 is the sendcount
751         2) 275427 is the recvcount
752         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
753   */
754   double clock = smpi_process()->simulated_elapsed();
755
756   CHECK_ACTION_PARAMS(action, 2, 2)
757   int sendcount=atoi(action[2]); 
758   int recvcount=atoi(action[3]); 
759
760   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
761
762   if(action[4] && action[5]) {
763     MPI_CURRENT_TYPE = decode_datatype(action[4]);
764     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
765   } else
766     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
767
768   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
769   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
770
771   int rank = smpi_process()->index();
772   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
773   extra->type = TRACING_ALLGATHER;
774   extra->send_size = sendcount;
775   extra->recv_size= recvcount;
776   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
777   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
778   extra->num_processes = MPI_COMM_WORLD->size();
779
780   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
781
782   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
783
784   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
785   log_timed_action (action, clock);
786 }
787
788 static void action_allgatherv(const char *const *action) {
789   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
790         0 allGatherV 275427 275427 275427 275427 204020
791      where:
792         1) 275427 is the sendcount
793         2) The next four elements declare the recvcounts array
794         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
795   */
796   double clock = smpi_process()->simulated_elapsed();
797
798   int comm_size = MPI_COMM_WORLD->size();
799   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
800   int sendcount=atoi(action[2]);
801   int recvcounts[comm_size];
802   int disps[comm_size];
803   int recv_sum=0;
804   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
805
806   if(action[3+comm_size] && action[4+comm_size]) {
807     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
808     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
809   } else
810     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
811
812   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
813
814   for(int i=0;i<comm_size;i++) {
815     recvcounts[i] = atoi(action[i+3]);
816     recv_sum=recv_sum+recvcounts[i];
817     disps[i] = 0;
818   }
819   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
820
821   int rank = smpi_process()->index();
822   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823   extra->type = TRACING_ALLGATHERV;
824   extra->send_size = sendcount;
825   extra->recvcounts= xbt_new(int, comm_size);
826   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
827     extra->recvcounts[i] = recvcounts[i];
828   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
829   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
830   extra->num_processes = comm_size;
831
832   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
833
834   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
835                           MPI_COMM_WORLD);
836
837   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
838   log_timed_action (action, clock);
839 }
840
841 static void action_allToAllv(const char *const *action) {
842   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
843         0 allToAllV 100 1 7 10 12 100 1 70 10 5
844      where:
845         1) 100 is the size of the send buffer *sizeof(int),
846         2) 1 7 10 12 is the sendcounts array
847         3) 100*sizeof(int) is the size of the receiver buffer
848         4)  1 70 10 5 is the recvcounts array
849   */
850   double clock = smpi_process()->simulated_elapsed();
851
852   int comm_size = MPI_COMM_WORLD->size();
853   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
854   int sendcounts[comm_size];
855   int recvcounts[comm_size];
856   int senddisps[comm_size];
857   int recvdisps[comm_size];
858
859   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
860
861   int send_buf_size=parse_double(action[2]);
862   int recv_buf_size=parse_double(action[3+comm_size]);
863   if(action[4+2*comm_size] && action[5+2*comm_size]) {
864     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
865     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
866   }
867   else
868     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
869
870   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
871   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
872
873   for(int i=0;i<comm_size;i++) {
874     sendcounts[i] = atoi(action[i+3]);
875     recvcounts[i] = atoi(action[i+4+comm_size]);
876     senddisps[i] = 0;
877     recvdisps[i] = 0;
878   }
879
880   int rank = smpi_process()->index();
881   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882   extra->type = TRACING_ALLTOALLV;
883   extra->recvcounts= xbt_new(int, comm_size);
884   extra->sendcounts= xbt_new(int, comm_size);
885   extra->num_processes = comm_size;
886
887   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
888     extra->send_size += sendcounts[i];
889     extra->sendcounts[i] = sendcounts[i];
890     extra->recv_size += recvcounts[i];
891     extra->recvcounts[i] = recvcounts[i];
892   }
893   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
894   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
895
896   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
897
898   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
899                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
900
901   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
902   log_timed_action (action, clock);
903 }
904
905 }} // namespace simgrid::smpi
906
907 void smpi_replay_run(int *argc, char***argv){
908   /* First initializes everything */
909   simgrid::smpi::Process::init(argc, argv);
910   smpi_process()->mark_as_initialized();
911   smpi_process()->set_replaying(true);
912
913   int rank = smpi_process()->index();
914   TRACE_smpi_init(rank);
915   TRACE_smpi_computing_init(rank);
916   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
917   extra->type = TRACING_INIT;
918   char *operation =bprintf("%s_init",__FUNCTION__);
919   TRACE_smpi_collective_in(rank, -1, operation, extra);
920   TRACE_smpi_collective_out(rank, -1, operation);
921   xbt_free(operation);
922   xbt_replay_action_register("init",       simgrid::smpi::action_init);
923   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
924   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
925   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
926   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
927   xbt_replay_action_register("send",       simgrid::smpi::action_send);
928   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
929   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
930   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
931   xbt_replay_action_register("test",       simgrid::smpi::action_test);
932   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
933   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
934   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
935   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
936   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
937   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
938   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
939   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
940   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
941   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
942   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
943   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
944   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
945   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
946
947   //if we have a delayed start, sleep here.
948   if(*argc>2){
949     char *endptr;
950     double value = strtod((*argv)[2], &endptr);
951     if (*endptr != '\0')
952       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
953     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
954     smpi_execute_flops(value);
955   } else {
956     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
957     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
958     smpi_execute_flops(0.0);
959   }
960
961   /* Actually run the replay */
962   simgrid::xbt::replay_runner(*argc, *argv);
963
964   /* and now, finalize everything */
965   /* One active process will stop. Decrease the counter*/
966   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
967   if (!get_reqq_self()->empty()){
968     unsigned int count_requests=get_reqq_self()->size();
969     MPI_Request requests[count_requests];
970     MPI_Status status[count_requests];
971     unsigned int i=0;
972
973     for (auto req: *get_reqq_self()){
974       requests[i] = req;
975       i++;
976     }
977     simgrid::smpi::Request::waitall(count_requests, requests, status);
978   }
979   delete get_reqq_self();
980   active_processes--;
981
982   if(active_processes==0){
983     /* Last process alive speaking: end the simulated timer */
984     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
985     xbt_free(sendbuffer);
986     xbt_free(recvbuffer);
987   }
988
989   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
990   extra_fin->type = TRACING_FINALIZE;
991   operation =bprintf("%s_finalize",__FUNCTION__);
992   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
993
994   smpi_process()->finalize();
995
996   TRACE_smpi_collective_out(rank, -1, operation);
997   TRACE_smpi_finalize(smpi_process()->index());
998   xbt_free(operation);
999 }