Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
don't use is_valid to validate datatypes for replay/ti-tracing
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
93 {
94   switch(atoi(action)) {
95     case 0:
96       MPI_CURRENT_TYPE=MPI_DOUBLE;
97       break;
98     case 1:
99       MPI_CURRENT_TYPE=MPI_INT;
100       break;
101     case 2:
102       MPI_CURRENT_TYPE=MPI_CHAR;
103       break;
104     case 3:
105       MPI_CURRENT_TYPE=MPI_SHORT;
106       break;
107     case 4:
108       MPI_CURRENT_TYPE=MPI_LONG;
109       break;
110     case 5:
111       MPI_CURRENT_TYPE=MPI_FLOAT;
112       break;
113     case 6:
114       MPI_CURRENT_TYPE=MPI_BYTE;
115       break;
116     default:
117       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118       break;
119   }
120    return MPI_CURRENT_TYPE;
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int rank = smpi_process()->index();
202
203   TRACE_smpi_computing_in(rank, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(rank);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   if(action[4])
218     MPI_CURRENT_TYPE=decode_datatype(action[4]);
219   else
220     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
221
222   int rank = smpi_process()->index();
223   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
224
225   TRACE_smpi_comm_in(rank, __FUNCTION__,
226                      new simgrid::instr::Pt2PtTIData("send", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
227   if (not TRACE_smpi_view_internals())
228     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
229
230   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
231
232   TRACE_smpi_comm_out(rank);
233
234   log_timed_action(action, clock);
235 }
236
237 static void action_Isend(const char *const *action)
238 {
239   CHECK_ACTION_PARAMS(action, 2, 1)
240   int to = atoi(action[2]);
241   double size=parse_double(action[3]);
242   double clock = smpi_process()->simulated_elapsed();
243
244   if(action[4])
245     MPI_CURRENT_TYPE=decode_datatype(action[4]);
246   else
247     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
248
249   int rank = smpi_process()->index();
250   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
251   TRACE_smpi_comm_in(rank, __FUNCTION__,
252                      new simgrid::instr::Pt2PtTIData("Isend", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
253   if (not TRACE_smpi_view_internals())
254     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
255
256   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
257
258   TRACE_smpi_comm_out(rank);
259
260   get_reqq_self()->push_back(request);
261
262   log_timed_action (action, clock);
263 }
264
265 static void action_recv(const char *const *action) {
266   CHECK_ACTION_PARAMS(action, 2, 1)
267   int from = atoi(action[2]);
268   double size=parse_double(action[3]);
269   double clock = smpi_process()->simulated_elapsed();
270   MPI_Status status;
271
272   if(action[4])
273     MPI_CURRENT_TYPE=decode_datatype(action[4]);
274   else
275     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
276
277   int rank = smpi_process()->index();
278   int src_traced = MPI_COMM_WORLD->group()->rank(from);
279
280   TRACE_smpi_comm_in(rank, __FUNCTION__,
281                      new simgrid::instr::Pt2PtTIData("recv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
282
283   //unknown size from the receiver point of view
284   if (size <= 0.0) {
285     Request::probe(from, 0, MPI_COMM_WORLD, &status);
286     size=status.count;
287   }
288
289   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
290
291   TRACE_smpi_comm_out(rank);
292   if (not TRACE_smpi_view_internals()) {
293     TRACE_smpi_recv(src_traced, rank, 0);
294   }
295
296   log_timed_action (action, clock);
297 }
298
299 static void action_Irecv(const char *const *action)
300 {
301   CHECK_ACTION_PARAMS(action, 2, 1)
302   int from = atoi(action[2]);
303   double size=parse_double(action[3]);
304   double clock = smpi_process()->simulated_elapsed();
305
306   if(action[4])
307     MPI_CURRENT_TYPE=decode_datatype(action[4]);
308   else
309     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
310
311   int rank = smpi_process()->index();
312   int src_traced = MPI_COMM_WORLD->group()->rank(from);
313   TRACE_smpi_comm_in(rank, __FUNCTION__,
314                      new simgrid::instr::Pt2PtTIData("Irecv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
315   MPI_Status status;
316   //unknow size from the receiver pov
317   if (size <= 0.0) {
318     Request::probe(from, 0, MPI_COMM_WORLD, &status);
319     size = status.count;
320   }
321
322   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
323
324   TRACE_smpi_comm_out(rank);
325   get_reqq_self()->push_back(request);
326
327   log_timed_action (action, clock);
328 }
329
330 static void action_test(const char* const* action)
331 {
332   CHECK_ACTION_PARAMS(action, 0, 0)
333   double clock = smpi_process()->simulated_elapsed();
334   MPI_Status status;
335
336   MPI_Request request = get_reqq_self()->back();
337   get_reqq_self()->pop_back();
338   //if request is null here, this may mean that a previous test has succeeded
339   //Different times in traced application and replayed version may lead to this
340   //In this case, ignore the extra calls.
341   if(request!=nullptr){
342     int rank = smpi_process()->index();
343     TRACE_smpi_testing_in(rank);
344
345     int flag = Request::test(&request, &status);
346
347     XBT_DEBUG("MPI_Test result: %d", flag);
348     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
349     get_reqq_self()->push_back(request);
350
351     TRACE_smpi_testing_out(rank);
352   }
353   log_timed_action (action, clock);
354 }
355
356 static void action_wait(const char *const *action){
357   CHECK_ACTION_PARAMS(action, 0, 0)
358   double clock = smpi_process()->simulated_elapsed();
359   MPI_Status status;
360
361   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
362       xbt_str_join_array(action," "));
363   MPI_Request request = get_reqq_self()->back();
364   get_reqq_self()->pop_back();
365
366   if (request==nullptr){
367     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
368     return;
369   }
370
371   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
372
373   MPI_Group group = request->comm()->group();
374   int src_traced = group->rank(request->src());
375   int dst_traced = group->rank(request->dst());
376   int is_wait_for_receive = (request->flags() & RECV);
377   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
378
379   Request::wait(&request, &status);
380
381   TRACE_smpi_comm_out(rank);
382   if (is_wait_for_receive)
383     TRACE_smpi_recv(src_traced, dst_traced, 0);
384   log_timed_action (action, clock);
385 }
386
387 static void action_waitall(const char *const *action){
388   CHECK_ACTION_PARAMS(action, 0, 0)
389   double clock = smpi_process()->simulated_elapsed();
390   const unsigned int count_requests = get_reqq_self()->size();
391
392   if (count_requests>0) {
393     MPI_Status status[count_requests];
394
395    int rank_traced = smpi_process()->index();
396    TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
397    int recvs_snd[count_requests];
398    int recvs_rcv[count_requests];
399    for (unsigned int i = 0; i < count_requests; i++) {
400      const auto& req = (*get_reqq_self())[i];
401      if (req && (req->flags () & RECV)){
402        recvs_snd[i]=req->src();
403        recvs_rcv[i]=req->dst();
404      }else
405        recvs_snd[i]=-100;
406    }
407    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
408
409    for (unsigned i = 0; i < count_requests; i++) {
410      if (recvs_snd[i]!=-100)
411        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
412    }
413    TRACE_smpi_comm_out(rank_traced);
414   }
415   log_timed_action (action, clock);
416 }
417
418 static void action_barrier(const char *const *action){
419   double clock = smpi_process()->simulated_elapsed();
420   int rank = smpi_process()->index();
421   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
422
423   Colls::barrier(MPI_COMM_WORLD);
424
425   TRACE_smpi_comm_out(rank);
426   log_timed_action (action, clock);
427 }
428
429 static void action_bcast(const char *const *action)
430 {
431   CHECK_ACTION_PARAMS(action, 1, 2)
432   double size = parse_double(action[2]);
433   double clock = smpi_process()->simulated_elapsed();
434   int root=0;
435   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
436   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
437
438   if(action[3]) {
439     root= atoi(action[3]);
440     if(action[4])
441       MPI_CURRENT_TYPE=decode_datatype(action[4]);
442   }
443
444   int rank = smpi_process()->index();
445   TRACE_smpi_comm_in(rank, __FUNCTION__,
446                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->index(root), -1.0, size, -1,
447                                                     encode_datatype(MPI_CURRENT_TYPE), ""));
448
449   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
450
451   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
452
453   TRACE_smpi_comm_out(rank);
454   log_timed_action (action, clock);
455 }
456
457 static void action_reduce(const char *const *action)
458 {
459   CHECK_ACTION_PARAMS(action, 2, 2)
460   double comm_size = parse_double(action[2]);
461   double comp_size = parse_double(action[3]);
462   double clock = smpi_process()->simulated_elapsed();
463   int root=0;
464   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
465
466   if(action[4]) {
467     root= atoi(action[4]);
468     if(action[5])
469       MPI_CURRENT_TYPE=decode_datatype(action[5]);
470   }
471
472   int rank = smpi_process()->index();
473   TRACE_smpi_comm_in(rank, __FUNCTION__,
474                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->index(root), comp_size,
475                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
476
477   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
480   smpi_execute_flops(comp_size);
481
482   TRACE_smpi_comm_out(rank);
483   log_timed_action (action, clock);
484 }
485
486 static void action_allReduce(const char *const *action) {
487   CHECK_ACTION_PARAMS(action, 2, 1)
488   double comm_size = parse_double(action[2]);
489   double comp_size = parse_double(action[3]);
490
491   if(action[4])
492     MPI_CURRENT_TYPE=decode_datatype(action[4]);
493   else
494     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
495
496   double clock = smpi_process()->simulated_elapsed();
497   int rank = smpi_process()->index();
498   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
499                                                                         encode_datatype(MPI_CURRENT_TYPE), ""));
500
501   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
502   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
503   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
504   smpi_execute_flops(comp_size);
505
506   TRACE_smpi_comm_out(rank);
507   log_timed_action (action, clock);
508 }
509
510 static void action_allToAll(const char *const *action) {
511   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
512   double clock = smpi_process()->simulated_elapsed();
513   int comm_size = MPI_COMM_WORLD->size();
514   int send_size = parse_double(action[2]);
515   int recv_size = parse_double(action[3]);
516   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
517
518   if(action[4] && action[5]) {
519     MPI_CURRENT_TYPE=decode_datatype(action[4]);
520     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
521   }
522   else
523     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
524
525   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
526   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
527
528   int rank = smpi_process()->index();
529   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
530                                                                         encode_datatype(MPI_CURRENT_TYPE),
531                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
532
533   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
534
535   TRACE_smpi_comm_out(rank);
536   log_timed_action (action, clock);
537 }
538
539 static void action_gather(const char *const *action) {
540   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
541         0 gather 68 68 0 0 0
542       where:
543         1) 68 is the sendcounts
544         2) 68 is the recvcounts
545         3) 0 is the root node
546         4) 0 is the send datatype id, see decode_datatype()
547         5) 0 is the recv datatype id, see decode_datatype()
548   */
549   CHECK_ACTION_PARAMS(action, 2, 3)
550   double clock = smpi_process()->simulated_elapsed();
551   int comm_size = MPI_COMM_WORLD->size();
552   int send_size = parse_double(action[2]);
553   int recv_size = parse_double(action[3]);
554   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
555   if(action[4] && action[5]) {
556     MPI_CURRENT_TYPE=decode_datatype(action[5]);
557     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
558   } else {
559     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
560   }
561   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
562   void *recv = nullptr;
563   int root=0;
564   if(action[4])
565     root=atoi(action[4]);
566   int rank = MPI_COMM_WORLD->rank();
567
568   if(rank==root)
569     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
570
571   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572                                                                         encode_datatype(MPI_CURRENT_TYPE),
573                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
574
575   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576
577   TRACE_smpi_comm_out(smpi_process()->index());
578   log_timed_action (action, clock);
579 }
580
581 static void action_scatter(const char* const* action)
582 {
583   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
584         0 gather 68 68 0 0 0
585       where:
586         1) 68 is the sendcounts
587         2) 68 is the recvcounts
588         3) 0 is the root node
589         4) 0 is the send datatype id, see decode_datatype()
590         5) 0 is the recv datatype id, see decode_datatype()
591   */
592   CHECK_ACTION_PARAMS(action, 2, 3)
593   double clock                   = smpi_process()->simulated_elapsed();
594   int comm_size                  = MPI_COMM_WORLD->size();
595   int send_size                  = parse_double(action[2]);
596   int recv_size                  = parse_double(action[3]);
597   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
598   if (action[4] && action[5]) {
599     MPI_CURRENT_TYPE  = decode_datatype(action[5]);
600     MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
601   } else {
602     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
603   }
604   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
605   void* recv = nullptr;
606   int root   = 0;
607   if (action[4])
608     root   = atoi(action[4]);
609   int rank = MPI_COMM_WORLD->rank();
610
611   if (rank == root)
612     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
613
614   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
615                                                                         encode_datatype(MPI_CURRENT_TYPE),
616                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
617
618   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
619
620   TRACE_smpi_comm_out(smpi_process()->index());
621   log_timed_action(action, clock);
622 }
623
624 static void action_gatherv(const char *const *action) {
625   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
626        0 gather 68 68 10 10 10 0 0 0
627      where:
628        1) 68 is the sendcount
629        2) 68 10 10 10 is the recvcounts
630        3) 0 is the root node
631        4) 0 is the send datatype id, see decode_datatype()
632        5) 0 is the recv datatype id, see decode_datatype()
633   */
634   double clock = smpi_process()->simulated_elapsed();
635   int comm_size = MPI_COMM_WORLD->size();
636   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
637   int send_size = parse_double(action[2]);
638   int disps[comm_size];
639   int recvcounts[comm_size];
640   int recv_sum=0;
641
642   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
643   if(action[4+comm_size] && action[5+comm_size]) {
644     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
645     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
646   } else
647     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
648
649   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
650   void *recv = nullptr;
651   for(int i=0;i<comm_size;i++) {
652     recvcounts[i] = atoi(action[i+3]);
653     recv_sum=recv_sum+recvcounts[i];
654     disps[i]=0;
655   }
656
657   int root=atoi(action[3+comm_size]);
658   int rank = MPI_COMM_WORLD->rank();
659
660   if(rank==root)
661     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
662
663   std::vector<int>* trace_recvcounts = new std::vector<int>;
664   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
665     trace_recvcounts->push_back(recvcounts[i]);
666
667   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
668                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
669                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
670
671   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
672
673   TRACE_smpi_comm_out(smpi_process()->index());
674   log_timed_action (action, clock);
675 }
676
677 static void action_scatterv(const char* const* action)
678 {
679   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
680        0 gather 68 10 10 10 68 0 0 0
681      where:
682        1) 68 10 10 10 is the sendcounts
683        2) 68 is the recvcount
684        3) 0 is the root node
685        4) 0 is the send datatype id, see decode_datatype()
686        5) 0 is the recv datatype id, see decode_datatype()
687   */
688   double clock  = smpi_process()->simulated_elapsed();
689   int comm_size = MPI_COMM_WORLD->size();
690   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
691   int recv_size = parse_double(action[2 + comm_size]);
692   int disps[comm_size];
693   int sendcounts[comm_size];
694   int send_sum = 0;
695
696   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
697   if (action[4 + comm_size] && action[5 + comm_size]) {
698     MPI_CURRENT_TYPE  = decode_datatype(action[4 + comm_size]);
699     MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
700   } else
701     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
702
703   void* send = nullptr;
704   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
705   for (int i = 0; i < comm_size; i++) {
706     sendcounts[i] = atoi(action[i + 2]);
707     send_sum += sendcounts[i];
708     disps[i] = 0;
709   }
710
711   int root = atoi(action[3 + comm_size]);
712   int rank = MPI_COMM_WORLD->rank();
713
714   if (rank == root)
715     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
716
717   std::vector<int>* trace_sendcounts = new std::vector<int>;
718   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
719     trace_sendcounts->push_back(sendcounts[i]);
720
721   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
722                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
723                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
724
725   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
726
727   TRACE_smpi_comm_out(smpi_process()->index());
728   log_timed_action(action, clock);
729 }
730
731 static void action_reducescatter(const char *const *action) {
732  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
733       0 reduceScatter 275427 275427 275427 204020 11346849 0
734     where:
735       1) The first four values after the name of the action declare the recvcounts array
736       2) The value 11346849 is the amount of instructions
737       3) The last value corresponds to the datatype, see decode_datatype().
738 */
739   double clock = smpi_process()->simulated_elapsed();
740   int comm_size = MPI_COMM_WORLD->size();
741   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
742   int comp_size = parse_double(action[2+comm_size]);
743   int recvcounts[comm_size];
744   int rank = smpi_process()->index();
745   int size = 0;
746   std::vector<int>* trace_recvcounts = new std::vector<int>;
747   if(action[3+comm_size])
748     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
749   else
750     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
751
752   for(int i=0;i<comm_size;i++) {
753     recvcounts[i] = atoi(action[i+2]);
754     trace_recvcounts->push_back(recvcounts[i]);
755     size+=recvcounts[i];
756   }
757
758   TRACE_smpi_comm_in(rank, __FUNCTION__,
759                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
760                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
761                                                        encode_datatype(MPI_CURRENT_TYPE)));
762
763   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
764   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
765
766   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
767   smpi_execute_flops(comp_size);
768
769   TRACE_smpi_comm_out(rank);
770   log_timed_action (action, clock);
771 }
772
773 static void action_allgather(const char *const *action) {
774   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
775         0 allGather 275427 275427
776     where:
777         1) 275427 is the sendcount
778         2) 275427 is the recvcount
779         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
780   */
781   double clock = smpi_process()->simulated_elapsed();
782
783   CHECK_ACTION_PARAMS(action, 2, 2)
784   int sendcount=atoi(action[2]);
785   int recvcount=atoi(action[3]);
786
787   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
788
789   if(action[4] && action[5]) {
790     MPI_CURRENT_TYPE = decode_datatype(action[4]);
791     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
792   } else
793     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
794
795   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
796   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
797
798   int rank = smpi_process()->index();
799
800   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
801                                                                         encode_datatype(MPI_CURRENT_TYPE),
802                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
803
804   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
805
806   TRACE_smpi_comm_out(rank);
807   log_timed_action (action, clock);
808 }
809
810 static void action_allgatherv(const char *const *action) {
811   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
812         0 allGatherV 275427 275427 275427 275427 204020
813      where:
814         1) 275427 is the sendcount
815         2) The next four elements declare the recvcounts array
816         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
817   */
818   double clock = smpi_process()->simulated_elapsed();
819
820   int comm_size = MPI_COMM_WORLD->size();
821   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
822   int sendcount=atoi(action[2]);
823   int recvcounts[comm_size];
824   int disps[comm_size];
825   int recv_sum=0;
826   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
827
828   if(action[3+comm_size] && action[4+comm_size]) {
829     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
830     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
831   } else
832     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
833
834   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
835
836   for(int i=0;i<comm_size;i++) {
837     recvcounts[i] = atoi(action[i+3]);
838     recv_sum=recv_sum+recvcounts[i];
839     disps[i] = 0;
840   }
841   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
842
843   int rank = smpi_process()->index();
844
845   std::vector<int>* trace_recvcounts = new std::vector<int>;
846   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
847     trace_recvcounts->push_back(recvcounts[i]);
848
849   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
850                                              "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
851                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
852
853   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
854                           MPI_COMM_WORLD);
855
856   TRACE_smpi_comm_out(rank);
857   log_timed_action (action, clock);
858 }
859
860 static void action_allToAllv(const char *const *action) {
861   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
862         0 allToAllV 100 1 7 10 12 100 1 70 10 5
863      where:
864         1) 100 is the size of the send buffer *sizeof(int),
865         2) 1 7 10 12 is the sendcounts array
866         3) 100*sizeof(int) is the size of the receiver buffer
867         4)  1 70 10 5 is the recvcounts array
868   */
869   double clock = smpi_process()->simulated_elapsed();
870
871   int comm_size = MPI_COMM_WORLD->size();
872   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
873   int send_size = 0;
874   int recv_size = 0;
875   int sendcounts[comm_size];
876   std::vector<int>* trace_sendcounts = new std::vector<int>;
877   int recvcounts[comm_size];
878   std::vector<int>* trace_recvcounts = new std::vector<int>;
879   int senddisps[comm_size];
880   int recvdisps[comm_size];
881
882   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
883
884   int send_buf_size=parse_double(action[2]);
885   int recv_buf_size=parse_double(action[3+comm_size]);
886   if(action[4+2*comm_size] && action[5+2*comm_size]) {
887     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
888     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
889   }
890   else
891     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
892
893   int rank       = smpi_process()->index();
894   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
895   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
896
897   for(int i=0;i<comm_size;i++) {
898     sendcounts[i] = atoi(action[i+3]);
899     trace_sendcounts->push_back(sendcounts[i]);
900     send_size += sendcounts[i];
901     recvcounts[i] = atoi(action[i+4+comm_size]);
902     trace_recvcounts->push_back(recvcounts[i]);
903     recv_size += recvcounts[i];
904     senddisps[i] = 0;
905     recvdisps[i] = 0;
906   }
907
908   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
909                                              "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
910                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
911
912   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
913                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
914
915   TRACE_smpi_comm_out(rank);
916   log_timed_action (action, clock);
917 }
918
919 }} // namespace simgrid::smpi
920
921 /** @brief Only initialize the replay, don't do it for real */
922 void smpi_replay_init(int* argc, char*** argv)
923 {
924   simgrid::smpi::Process::init(argc, argv);
925   smpi_process()->mark_as_initialized();
926   smpi_process()->set_replaying(true);
927
928   int rank = smpi_process()->index();
929   TRACE_smpi_init(rank);
930   TRACE_smpi_computing_init(rank);
931   TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
932   TRACE_smpi_comm_out(rank);
933   xbt_replay_action_register("init",       simgrid::smpi::action_init);
934   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
935   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
936   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
937   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
938   xbt_replay_action_register("send",       simgrid::smpi::action_send);
939   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
940   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
941   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
942   xbt_replay_action_register("test",       simgrid::smpi::action_test);
943   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
944   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
945   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
946   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
947   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
948   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
949   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
950   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
951   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
952   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
953   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
954   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
955   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
956   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
957   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
958   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
959
960   //if we have a delayed start, sleep here.
961   if(*argc>2){
962     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
963     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
964     smpi_execute_flops(value);
965   } else {
966     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
967     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
968     smpi_execute_flops(0.0);
969   }
970 }
971
972 /** @brief actually run the replay after initialization */
973 void smpi_replay_main(int* argc, char*** argv)
974 {
975   simgrid::xbt::replay_runner(*argc, *argv);
976
977   /* and now, finalize everything */
978   /* One active process will stop. Decrease the counter*/
979   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
980   if (not get_reqq_self()->empty()) {
981     unsigned int count_requests=get_reqq_self()->size();
982     MPI_Request requests[count_requests];
983     MPI_Status status[count_requests];
984     unsigned int i=0;
985
986     for (auto const& req : *get_reqq_self()) {
987       requests[i] = req;
988       i++;
989     }
990     simgrid::smpi::Request::waitall(count_requests, requests, status);
991   }
992   delete get_reqq_self();
993   active_processes--;
994
995   if(active_processes==0){
996     /* Last process alive speaking: end the simulated timer */
997     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
998     xbt_free(sendbuffer);
999     xbt_free(recvbuffer);
1000   }
1001
1002   TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1003
1004   smpi_process()->finalize();
1005
1006   TRACE_smpi_comm_out(smpi_process()->index());
1007   TRACE_smpi_finalize(smpi_process()->index());
1008 }
1009
1010 /** @brief chain a replay initialization and a replay start */
1011 void smpi_replay_run(int* argc, char*** argv)
1012 {
1013   smpi_replay_init(argc, argv);
1014   smpi_replay_main(argc, argv);
1015 }