Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove useless parameters in (simplified) instr
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90 static MPI_Datatype decode_datatype(const char *const action)
91 {
92   switch(atoi(action)) {
93     case 0:
94       MPI_CURRENT_TYPE=MPI_DOUBLE;
95       break;
96     case 1:
97       MPI_CURRENT_TYPE=MPI_INT;
98       break;
99     case 2:
100       MPI_CURRENT_TYPE=MPI_CHAR;
101       break;
102     case 3:
103       MPI_CURRENT_TYPE=MPI_SHORT;
104       break;
105     case 4:
106       MPI_CURRENT_TYPE=MPI_LONG;
107       break;
108     case 5:
109       MPI_CURRENT_TYPE=MPI_FLOAT;
110       break;
111     case 6:
112       MPI_CURRENT_TYPE=MPI_BYTE;
113       break;
114     default:
115       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116       break;
117   }
118    return MPI_CURRENT_TYPE;
119 }
120
121 const char* encode_datatype(MPI_Datatype datatype, int* known)
122 {
123   //default type for output is set to MPI_BYTE
124   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125   if(known!=nullptr)
126     *known=1;
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
142   if(known!=nullptr)
143     *known=0;
144   // default - not implemented.
145   // do not warn here as we pass in this function even for other trace formats
146   return "-1";
147 }
148
149 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150     int i=0;\
151     while(action[i]!=nullptr)\
152      i++;\
153     if(i<mandatory+2)                                           \
154     THROWF(arg_error, 0, "%s replay failed.\n" \
155           "%d items were given on the line. First two should be process_id and action.  " \
156           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
157           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158   }
159
160 namespace simgrid {
161 namespace smpi {
162
163 static void action_init(const char *const *action)
164 {
165   XBT_DEBUG("Initialize the counters");
166   CHECK_ACTION_PARAMS(action, 0, 1)
167   if(action[2])
168     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
169   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
170
171   /* start a simulated timer */
172   smpi_process()->simulated_start();
173   /*initialize the number of active processes */
174   active_processes = smpi_process_count();
175
176   set_reqq_self(new std::vector<MPI_Request>);
177 }
178
179 static void action_finalize(const char *const *action)
180 {
181   /* Nothing to do */
182 }
183
184 static void action_comm_size(const char *const *action)
185 {
186   communicator_size = parse_double(action[2]);
187   log_timed_action (action, smpi_process()->simulated_elapsed());
188 }
189
190 static void action_comm_split(const char *const *action)
191 {
192   log_timed_action (action, smpi_process()->simulated_elapsed());
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   log_timed_action (action, smpi_process()->simulated_elapsed());
198 }
199
200 static void action_compute(const char *const *action)
201 {
202   CHECK_ACTION_PARAMS(action, 1, 0)
203   double clock = smpi_process()->simulated_elapsed();
204   double flops= parse_double(action[2]);
205   int rank = smpi_process()->index();
206   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207   extra->type=TRACING_COMPUTING;
208   extra->comp_size=flops;
209   TRACE_smpi_computing_in(rank, extra);
210
211   smpi_execute_flops(flops);
212
213   TRACE_smpi_computing_out(rank);
214   log_timed_action (action, clock);
215 }
216
217 static void action_send(const char *const *action)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 1)
220   int to = atoi(action[2]);
221   double size=parse_double(action[3]);
222   double clock = smpi_process()->simulated_elapsed();
223
224   if(action[4])
225     MPI_CURRENT_TYPE=decode_datatype(action[4]);
226   else
227     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
228
229   int rank = smpi_process()->index();
230
231   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
232   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233   extra->type = TRACING_SEND;
234   extra->send_size = size;
235   extra->src = rank;
236   extra->dst = dst_traced;
237   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
238   TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
239   if (not TRACE_smpi_view_internals())
240     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
241
242   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
243
244   log_timed_action (action, clock);
245
246   TRACE_smpi_ptp_out(rank);
247 }
248
249 static void action_Isend(const char *const *action)
250 {
251   CHECK_ACTION_PARAMS(action, 2, 1)
252   int to = atoi(action[2]);
253   double size=parse_double(action[3]);
254   double clock = smpi_process()->simulated_elapsed();
255
256   if(action[4])
257     MPI_CURRENT_TYPE=decode_datatype(action[4]);
258   else
259     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
260
261   int rank = smpi_process()->index();
262   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
263   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
264   extra->type = TRACING_ISEND;
265   extra->send_size = size;
266   extra->src = rank;
267   extra->dst = dst_traced;
268   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
269   TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
270   if (not TRACE_smpi_view_internals())
271     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
272
273   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
274
275   TRACE_smpi_ptp_out(rank);
276
277   get_reqq_self()->push_back(request);
278
279   log_timed_action (action, clock);
280 }
281
282 static void action_recv(const char *const *action) {
283   CHECK_ACTION_PARAMS(action, 2, 1)
284   int from = atoi(action[2]);
285   double size=parse_double(action[3]);
286   double clock = smpi_process()->simulated_elapsed();
287   MPI_Status status;
288
289   if(action[4])
290     MPI_CURRENT_TYPE=decode_datatype(action[4]);
291   else
292     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
293
294   int rank = smpi_process()->index();
295   int src_traced = MPI_COMM_WORLD->group()->rank(from);
296
297   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
298   extra->type = TRACING_RECV;
299   extra->send_size = size;
300   extra->src = src_traced;
301   extra->dst = rank;
302   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
303   TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
304
305   //unknown size from the receiver point of view
306   if(size<=0.0){
307     Request::probe(from, 0, MPI_COMM_WORLD, &status);
308     size=status.count;
309   }
310
311   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
312
313   TRACE_smpi_ptp_out(rank);
314   if (not TRACE_smpi_view_internals()) {
315     TRACE_smpi_recv(src_traced, rank, 0);
316   }
317
318   log_timed_action (action, clock);
319 }
320
321 static void action_Irecv(const char *const *action)
322 {
323   CHECK_ACTION_PARAMS(action, 2, 1)
324   int from = atoi(action[2]);
325   double size=parse_double(action[3]);
326   double clock = smpi_process()->simulated_elapsed();
327
328   if(action[4])
329     MPI_CURRENT_TYPE=decode_datatype(action[4]);
330   else
331     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
332
333   int rank = smpi_process()->index();
334   int src_traced = MPI_COMM_WORLD->group()->rank(from);
335   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336   extra->type = TRACING_IRECV;
337   extra->send_size = size;
338   extra->src = src_traced;
339   extra->dst = rank;
340   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
341   TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
342   MPI_Status status;
343   //unknow size from the receiver pov
344   if(size<=0.0){
345       Request::probe(from, 0, MPI_COMM_WORLD, &status);
346       size=status.count;
347   }
348
349   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
350
351   TRACE_smpi_ptp_out(rank);
352   get_reqq_self()->push_back(request);
353
354   log_timed_action (action, clock);
355 }
356
357 static void action_test(const char *const *action){
358   CHECK_ACTION_PARAMS(action, 0, 0)
359   double clock = smpi_process()->simulated_elapsed();
360   MPI_Status status;
361
362   MPI_Request request = get_reqq_self()->back();
363   get_reqq_self()->pop_back();
364   //if request is null here, this may mean that a previous test has succeeded
365   //Different times in traced application and replayed version may lead to this
366   //In this case, ignore the extra calls.
367   if(request!=nullptr){
368     int rank = smpi_process()->index();
369     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
370     extra->type=TRACING_TEST;
371     TRACE_smpi_testing_in(rank, extra);
372
373     int flag = Request::test(&request, &status);
374
375     XBT_DEBUG("MPI_Test result: %d", flag);
376     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
377     get_reqq_self()->push_back(request);
378
379     TRACE_smpi_testing_out(rank);
380   }
381   log_timed_action (action, clock);
382 }
383
384 static void action_wait(const char *const *action){
385   CHECK_ACTION_PARAMS(action, 0, 0)
386   double clock = smpi_process()->simulated_elapsed();
387   MPI_Status status;
388
389   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
390       xbt_str_join_array(action," "));
391   MPI_Request request = get_reqq_self()->back();
392   get_reqq_self()->pop_back();
393
394   if (request==nullptr){
395     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
396     return;
397   }
398
399   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
400
401   MPI_Group group = request->comm()->group();
402   int src_traced = group->rank(request->src());
403   int dst_traced = group->rank(request->dst());
404   int is_wait_for_receive = (request->flags() & RECV);
405   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406   extra->type = TRACING_WAIT;
407   TRACE_smpi_ptp_in(rank, __FUNCTION__, extra);
408
409   Request::wait(&request, &status);
410
411   TRACE_smpi_ptp_out(rank);
412   if (is_wait_for_receive)
413     TRACE_smpi_recv(src_traced, dst_traced, 0);
414   log_timed_action (action, clock);
415 }
416
417 static void action_waitall(const char *const *action){
418   CHECK_ACTION_PARAMS(action, 0, 0)
419   double clock = smpi_process()->simulated_elapsed();
420   const unsigned int count_requests = get_reqq_self()->size();
421
422   if (count_requests>0) {
423     MPI_Status status[count_requests];
424
425    int rank_traced = smpi_process()->index();
426    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
427    extra->type = TRACING_WAITALL;
428    extra->send_size=count_requests;
429    TRACE_smpi_ptp_in(rank_traced, __FUNCTION__,extra);
430    int recvs_snd[count_requests];
431    int recvs_rcv[count_requests];
432    for (unsigned int i = 0; i < count_requests; i++) {
433      const auto& req = (*get_reqq_self())[i];
434      if (req && (req->flags () & RECV)){
435        recvs_snd[i]=req->src();
436        recvs_rcv[i]=req->dst();
437      }else
438        recvs_snd[i]=-100;
439    }
440    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
441
442    for (unsigned i = 0; i < count_requests; i++) {
443      if (recvs_snd[i]!=-100)
444        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
445    }
446    TRACE_smpi_ptp_out(rank_traced);
447   }
448   log_timed_action (action, clock);
449 }
450
451 static void action_barrier(const char *const *action){
452   double clock = smpi_process()->simulated_elapsed();
453   int rank = smpi_process()->index();
454   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455   extra->type = TRACING_BARRIER;
456   TRACE_smpi_collective_in(rank, __FUNCTION__, extra);
457
458   Colls::barrier(MPI_COMM_WORLD);
459
460   TRACE_smpi_collective_out(rank);
461   log_timed_action (action, clock);
462 }
463
464 static void action_bcast(const char *const *action)
465 {
466   CHECK_ACTION_PARAMS(action, 1, 2)
467   double size = parse_double(action[2]);
468   double clock = smpi_process()->simulated_elapsed();
469   int root=0;
470   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
472
473   if(action[3]) {
474     root= atoi(action[3]);
475     if(action[4])
476       MPI_CURRENT_TYPE=decode_datatype(action[4]);
477   }
478
479   int rank = smpi_process()->index();
480   int root_traced = MPI_COMM_WORLD->group()->index(root);
481
482   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
483   extra->type = TRACING_BCAST;
484   extra->send_size = size;
485   extra->root = root_traced;
486   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
487   TRACE_smpi_collective_in(rank, __FUNCTION__, extra);
488   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
489
490   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
491
492   TRACE_smpi_collective_out(rank);
493   log_timed_action (action, clock);
494 }
495
496 static void action_reduce(const char *const *action)
497 {
498   CHECK_ACTION_PARAMS(action, 2, 2)
499   double comm_size = parse_double(action[2]);
500   double comp_size = parse_double(action[3]);
501   double clock = smpi_process()->simulated_elapsed();
502   int root=0;
503   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
504
505   if(action[4]) {
506     root= atoi(action[4]);
507     if(action[5])
508       MPI_CURRENT_TYPE=decode_datatype(action[5]);
509   }
510
511   int rank = smpi_process()->index();
512   int root_traced = MPI_COMM_WORLD->group()->rank(root);
513   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
514   extra->type = TRACING_REDUCE;
515   extra->send_size = comm_size;
516   extra->comp_size = comp_size;
517   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
518   extra->root = root_traced;
519
520   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
521
522   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
525   smpi_execute_flops(comp_size);
526
527   TRACE_smpi_collective_out(rank);
528   log_timed_action (action, clock);
529 }
530
531 static void action_allReduce(const char *const *action) {
532   CHECK_ACTION_PARAMS(action, 2, 1)
533   double comm_size = parse_double(action[2]);
534   double comp_size = parse_double(action[3]);
535
536   if(action[4])
537     MPI_CURRENT_TYPE=decode_datatype(action[4]);
538   else
539     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
540
541   double clock = smpi_process()->simulated_elapsed();
542   int rank = smpi_process()->index();
543   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
544   extra->type = TRACING_ALLREDUCE;
545   extra->send_size = comm_size;
546   extra->comp_size = comp_size;
547   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
548   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
549
550   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
553   smpi_execute_flops(comp_size);
554
555   TRACE_smpi_collective_out(rank);
556   log_timed_action (action, clock);
557 }
558
559 static void action_allToAll(const char *const *action) {
560   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
561   double clock = smpi_process()->simulated_elapsed();
562   int comm_size = MPI_COMM_WORLD->size();
563   int send_size = parse_double(action[2]);
564   int recv_size = parse_double(action[3]);
565   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
566
567   if(action[4] && action[5]) {
568     MPI_CURRENT_TYPE=decode_datatype(action[4]);
569     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
570   }
571   else
572     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
573
574   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
575   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
576
577   int rank = smpi_process()->index();
578   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
579   extra->type = TRACING_ALLTOALL;
580   extra->send_size = send_size;
581   extra->recv_size = recv_size;
582   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
583   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
584
585   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
586
587   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
588
589   TRACE_smpi_collective_out(rank);
590   log_timed_action (action, clock);
591 }
592
593 static void action_gather(const char *const *action) {
594   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
595         0 gather 68 68 0 0 0
596       where:
597         1) 68 is the sendcounts
598         2) 68 is the recvcounts
599         3) 0 is the root node
600         4) 0 is the send datatype id, see decode_datatype()
601         5) 0 is the recv datatype id, see decode_datatype()
602   */
603   CHECK_ACTION_PARAMS(action, 2, 3)
604   double clock = smpi_process()->simulated_elapsed();
605   int comm_size = MPI_COMM_WORLD->size();
606   int send_size = parse_double(action[2]);
607   int recv_size = parse_double(action[3]);
608   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
609   if(action[4] && action[5]) {
610     MPI_CURRENT_TYPE=decode_datatype(action[5]);
611     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
612   } else {
613     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
614   }
615   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
616   void *recv = nullptr;
617   int root=0;
618   if(action[4])
619     root=atoi(action[4]);
620   int rank = MPI_COMM_WORLD->rank();
621
622   if(rank==root)
623     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
624
625   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
626   extra->type = TRACING_GATHER;
627   extra->send_size = send_size;
628   extra->recv_size = recv_size;
629   extra->root = root;
630   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
631   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
632
633   TRACE_smpi_collective_in(smpi_process()->index(), __FUNCTION__, extra);
634
635   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
636
637   TRACE_smpi_collective_out(smpi_process()->index());
638   log_timed_action (action, clock);
639 }
640
641 static void action_gatherv(const char *const *action) {
642   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
643        0 gather 68 68 10 10 10 0 0 0
644      where:
645        1) 68 is the sendcount
646        2) 68 10 10 10 is the recvcounts
647        3) 0 is the root node
648        4) 0 is the send datatype id, see decode_datatype()
649        5) 0 is the recv datatype id, see decode_datatype()
650   */
651   double clock = smpi_process()->simulated_elapsed();
652   int comm_size = MPI_COMM_WORLD->size();
653   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
654   int send_size = parse_double(action[2]);
655   int disps[comm_size];
656   int recvcounts[comm_size];
657   int recv_sum=0;
658
659   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
660   if(action[4+comm_size] && action[5+comm_size]) {
661     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
662     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
663   } else
664     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
665
666   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
667   void *recv = nullptr;
668   for(int i=0;i<comm_size;i++) {
669     recvcounts[i] = atoi(action[i+3]);
670     recv_sum=recv_sum+recvcounts[i];
671     disps[i]=0;
672   }
673
674   int root=atoi(action[3+comm_size]);
675   int rank = MPI_COMM_WORLD->rank();
676
677   if(rank==root)
678     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
679
680   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
681   extra->type = TRACING_GATHERV;
682   extra->send_size = send_size;
683   extra->recvcounts= xbt_new(int,comm_size);
684   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
685     extra->recvcounts[i] = recvcounts[i];
686   extra->root = root;
687   extra->num_processes = comm_size;
688   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
689   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
690
691   TRACE_smpi_collective_in(smpi_process()->index(), __FUNCTION__, extra);
692
693   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
694
695   TRACE_smpi_collective_out(smpi_process()->index());
696   log_timed_action (action, clock);
697 }
698
699 static void action_reducescatter(const char *const *action) {
700  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
701       0 reduceScatter 275427 275427 275427 204020 11346849 0
702     where:
703       1) The first four values after the name of the action declare the recvcounts array
704       2) The value 11346849 is the amount of instructions
705       3) The last value corresponds to the datatype, see decode_datatype().
706 */
707   double clock = smpi_process()->simulated_elapsed();
708   int comm_size = MPI_COMM_WORLD->size();
709   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
710   int comp_size = parse_double(action[2+comm_size]);
711   int recvcounts[comm_size];
712   int rank = smpi_process()->index();
713   int size = 0;
714   if(action[3+comm_size])
715     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
716   else
717     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
718
719   for(int i=0;i<comm_size;i++) {
720     recvcounts[i] = atoi(action[i+2]);
721     size+=recvcounts[i];
722   }
723
724   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725   extra->type = TRACING_REDUCE_SCATTER;
726   extra->send_size = 0;
727   extra->recvcounts= xbt_new(int, comm_size);
728   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
729     extra->recvcounts[i] = recvcounts[i];
730   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
731   extra->comp_size = comp_size;
732   extra->num_processes = comm_size;
733
734   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
735
736   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
737   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
738
739   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
740   smpi_execute_flops(comp_size);
741
742   TRACE_smpi_collective_out(rank);
743   log_timed_action (action, clock);
744 }
745
746 static void action_allgather(const char *const *action) {
747   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
748         0 allGather 275427 275427
749     where:
750         1) 275427 is the sendcount
751         2) 275427 is the recvcount
752         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
753   */
754   double clock = smpi_process()->simulated_elapsed();
755
756   CHECK_ACTION_PARAMS(action, 2, 2)
757   int sendcount=atoi(action[2]);
758   int recvcount=atoi(action[3]);
759
760   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
761
762   if(action[4] && action[5]) {
763     MPI_CURRENT_TYPE = decode_datatype(action[4]);
764     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
765   } else
766     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
767
768   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
769   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
770
771   int rank = smpi_process()->index();
772   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
773   extra->type = TRACING_ALLGATHER;
774   extra->send_size = sendcount;
775   extra->recv_size= recvcount;
776   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
777   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
778   extra->num_processes = MPI_COMM_WORLD->size();
779
780   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
781
782   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
783
784   TRACE_smpi_collective_out(rank);
785   log_timed_action (action, clock);
786 }
787
788 static void action_allgatherv(const char *const *action) {
789   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
790         0 allGatherV 275427 275427 275427 275427 204020
791      where:
792         1) 275427 is the sendcount
793         2) The next four elements declare the recvcounts array
794         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
795   */
796   double clock = smpi_process()->simulated_elapsed();
797
798   int comm_size = MPI_COMM_WORLD->size();
799   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
800   int sendcount=atoi(action[2]);
801   int recvcounts[comm_size];
802   int disps[comm_size];
803   int recv_sum=0;
804   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
805
806   if(action[3+comm_size] && action[4+comm_size]) {
807     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
808     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
809   } else
810     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
811
812   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
813
814   for(int i=0;i<comm_size;i++) {
815     recvcounts[i] = atoi(action[i+3]);
816     recv_sum=recv_sum+recvcounts[i];
817     disps[i] = 0;
818   }
819   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
820
821   int rank = smpi_process()->index();
822   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823   extra->type = TRACING_ALLGATHERV;
824   extra->send_size = sendcount;
825   extra->recvcounts= xbt_new(int, comm_size);
826   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
827     extra->recvcounts[i] = recvcounts[i];
828   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
829   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
830   extra->num_processes = comm_size;
831
832   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
833
834   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
835                           MPI_COMM_WORLD);
836
837   TRACE_smpi_collective_out(rank);
838   log_timed_action (action, clock);
839 }
840
841 static void action_allToAllv(const char *const *action) {
842   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
843         0 allToAllV 100 1 7 10 12 100 1 70 10 5
844      where:
845         1) 100 is the size of the send buffer *sizeof(int),
846         2) 1 7 10 12 is the sendcounts array
847         3) 100*sizeof(int) is the size of the receiver buffer
848         4)  1 70 10 5 is the recvcounts array
849   */
850   double clock = smpi_process()->simulated_elapsed();
851
852   int comm_size = MPI_COMM_WORLD->size();
853   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
854   int sendcounts[comm_size];
855   int recvcounts[comm_size];
856   int senddisps[comm_size];
857   int recvdisps[comm_size];
858
859   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
860
861   int send_buf_size=parse_double(action[2]);
862   int recv_buf_size=parse_double(action[3+comm_size]);
863   if(action[4+2*comm_size] && action[5+2*comm_size]) {
864     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
865     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
866   }
867   else
868     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
869
870   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
871   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
872
873   for(int i=0;i<comm_size;i++) {
874     sendcounts[i] = atoi(action[i+3]);
875     recvcounts[i] = atoi(action[i+4+comm_size]);
876     senddisps[i] = 0;
877     recvdisps[i] = 0;
878   }
879
880   int rank = smpi_process()->index();
881   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882   extra->type = TRACING_ALLTOALLV;
883   extra->recvcounts= xbt_new(int, comm_size);
884   extra->sendcounts= xbt_new(int, comm_size);
885   extra->num_processes = comm_size;
886
887   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
888     extra->send_size += sendcounts[i];
889     extra->sendcounts[i] = sendcounts[i];
890     extra->recv_size += recvcounts[i];
891     extra->recvcounts[i] = recvcounts[i];
892   }
893   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
894   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
895
896   TRACE_smpi_collective_in(rank, __FUNCTION__,extra);
897
898   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
899                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
900
901   TRACE_smpi_collective_out(rank);
902   log_timed_action (action, clock);
903 }
904
905 }} // namespace simgrid::smpi
906
907 /** @brief Only initialize the replay, don't do it for real */
908 void smpi_replay_init(int* argc, char*** argv)
909 {
910   simgrid::smpi::Process::init(argc, argv);
911   smpi_process()->mark_as_initialized();
912   smpi_process()->set_replaying(true);
913
914   int rank = smpi_process()->index();
915   TRACE_smpi_init(rank);
916   TRACE_smpi_computing_init(rank);
917   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
918   extra->type = TRACING_INIT;
919   TRACE_smpi_collective_in(rank, "smpi_replay_run_init", extra);
920   TRACE_smpi_collective_out(rank);
921   xbt_replay_action_register("init",       simgrid::smpi::action_init);
922   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
923   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
924   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
925   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
926   xbt_replay_action_register("send",       simgrid::smpi::action_send);
927   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
928   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
929   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
930   xbt_replay_action_register("test",       simgrid::smpi::action_test);
931   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
932   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
933   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
934   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
935   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
936   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
937   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
938   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
939   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
940   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
941   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
942   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
943   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
944   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
945
946   //if we have a delayed start, sleep here.
947   if(*argc>2){
948     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
949     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
950     smpi_execute_flops(value);
951   } else {
952     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
953     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
954     smpi_execute_flops(0.0);
955   }
956 }
957
958 /** @brief actually run the replay after initialization */
959 void smpi_replay_main(int* argc, char*** argv)
960 {
961   simgrid::xbt::replay_runner(*argc, *argv);
962
963   /* and now, finalize everything */
964   /* One active process will stop. Decrease the counter*/
965   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
966   if (not get_reqq_self()->empty()) {
967     unsigned int count_requests=get_reqq_self()->size();
968     MPI_Request requests[count_requests];
969     MPI_Status status[count_requests];
970     unsigned int i=0;
971
972     for (auto const& req : *get_reqq_self()) {
973       requests[i] = req;
974       i++;
975     }
976     simgrid::smpi::Request::waitall(count_requests, requests, status);
977   }
978   delete get_reqq_self();
979   active_processes--;
980
981   if(active_processes==0){
982     /* Last process alive speaking: end the simulated timer */
983     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
984     xbt_free(sendbuffer);
985     xbt_free(recvbuffer);
986   }
987
988   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
989   extra_fin->type = TRACING_FINALIZE;
990   TRACE_smpi_collective_in(smpi_process()->index(), "smpi_replay_run_finalize", extra_fin);
991
992   smpi_process()->finalize();
993
994   TRACE_smpi_collective_out(smpi_process()->index());
995   TRACE_smpi_finalize(smpi_process()->index());
996 }
997
998 /** @brief chain a replay initialization and a replay start */
999 void smpi_replay_run(int* argc, char*** argv)
1000 {
1001   smpi_replay_init(argc, argv);
1002   smpi_replay_main(argc, argv);
1003 }