Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Remove unused KEY_SIZE macro from .cpp
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 using simgrid::s4u::Actor;
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 static int communicator_size = 0;
23 static int active_processes  = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
25
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size = 0;
30 static char* sendbuffer    = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer    = nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(Actor::self()->getPid());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({Actor::self()->getPid(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
93 {
94   switch(atoi(action)) {
95     case 0:
96       MPI_CURRENT_TYPE=MPI_DOUBLE;
97       break;
98     case 1:
99       MPI_CURRENT_TYPE=MPI_INT;
100       break;
101     case 2:
102       MPI_CURRENT_TYPE=MPI_CHAR;
103       break;
104     case 3:
105       MPI_CURRENT_TYPE=MPI_SHORT;
106       break;
107     case 4:
108       MPI_CURRENT_TYPE=MPI_LONG;
109       break;
110     case 5:
111       MPI_CURRENT_TYPE=MPI_FLOAT;
112       break;
113     case 6:
114       MPI_CURRENT_TYPE=MPI_BYTE;
115       break;
116     default:
117       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118       break;
119   }
120    return MPI_CURRENT_TYPE;
121 }
122
123 const char* encode_datatype(MPI_Datatype datatype)
124 {
125   if (datatype==MPI_BYTE)
126       return "";
127   if(datatype==MPI_DOUBLE)
128       return "0";
129   if(datatype==MPI_INT)
130       return "1";
131   if(datatype==MPI_CHAR)
132       return "2";
133   if(datatype==MPI_SHORT)
134       return "3";
135   if(datatype==MPI_LONG)
136     return "4";
137   if(datatype==MPI_FLOAT)
138       return "5";
139   // default - not implemented.
140   // do not warn here as we pass in this function even for other trace formats
141   return "-1";
142 }
143
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145     int i=0;\
146     while(action[i]!=nullptr)\
147      i++;\
148     if(i<mandatory+2)                                           \
149     THROWF(arg_error, 0, "%s replay failed.\n" \
150           "%d items were given on the line. First two should be process_id and action.  " \
151           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
153   }
154
155 namespace simgrid {
156 namespace smpi {
157
158 static void action_init(const char *const *action)
159 {
160   XBT_DEBUG("Initialize the counters");
161   CHECK_ACTION_PARAMS(action, 0, 1)
162   if(action[2])
163     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164   else
165     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process()->simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* Nothing to do */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   communicator_size = parse_double(action[2]);
183   log_timed_action (action, smpi_process()->simulated_elapsed());
184 }
185
186 static void action_comm_split(const char *const *action)
187 {
188   log_timed_action (action, smpi_process()->simulated_elapsed());
189 }
190
191 static void action_comm_dup(const char *const *action)
192 {
193   log_timed_action (action, smpi_process()->simulated_elapsed());
194 }
195
196 static void action_compute(const char *const *action)
197 {
198   CHECK_ACTION_PARAMS(action, 1, 0)
199   double clock = smpi_process()->simulated_elapsed();
200   double flops= parse_double(action[2]);
201   int my_proc_id = Actor::self()->getPid();
202
203   TRACE_smpi_computing_in(my_proc_id, flops);
204   smpi_execute_flops(flops);
205   TRACE_smpi_computing_out(my_proc_id);
206
207   log_timed_action (action, clock);
208 }
209
210 static void action_send(const char *const *action)
211 {
212   CHECK_ACTION_PARAMS(action, 2, 1)
213   int to = atoi(action[2]);
214   double size=parse_double(action[3]);
215   double clock = smpi_process()->simulated_elapsed();
216
217   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218
219   int my_proc_id = Actor::self()->getPid();
220   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221
222   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
223                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
224   if (not TRACE_smpi_view_internals())
225     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226
227   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228
229   TRACE_smpi_comm_out(my_proc_id);
230
231   log_timed_action(action, clock);
232 }
233
234 static void action_Isend(const char *const *action)
235 {
236   CHECK_ACTION_PARAMS(action, 2, 1)
237   int to = atoi(action[2]);
238   double size=parse_double(action[3]);
239   double clock = smpi_process()->simulated_elapsed();
240
241   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242
243   int my_proc_id = Actor::self()->getPid();
244   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
245   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
246                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
247   if (not TRACE_smpi_view_internals())
248     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249
250   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251
252   TRACE_smpi_comm_out(my_proc_id);
253
254   get_reqq_self()->push_back(request);
255
256   log_timed_action (action, clock);
257 }
258
259 static void action_recv(const char *const *action) {
260   CHECK_ACTION_PARAMS(action, 2, 1)
261   int from = atoi(action[2]);
262   double size=parse_double(action[3]);
263   double clock = smpi_process()->simulated_elapsed();
264   MPI_Status status;
265
266   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267
268   int my_proc_id = Actor::self()->getPid();
269   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270
271   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
272                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273
274   //unknown size from the receiver point of view
275   if (size <= 0.0) {
276     Request::probe(from, 0, MPI_COMM_WORLD, &status);
277     size=status.count;
278   }
279
280   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281
282   TRACE_smpi_comm_out(my_proc_id);
283   if (not TRACE_smpi_view_internals()) {
284     TRACE_smpi_recv(src_traced, my_proc_id, 0);
285   }
286
287   log_timed_action (action, clock);
288 }
289
290 static void action_Irecv(const char *const *action)
291 {
292   CHECK_ACTION_PARAMS(action, 2, 1)
293   int from = atoi(action[2]);
294   double size=parse_double(action[3]);
295   double clock = smpi_process()->simulated_elapsed();
296
297   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298
299   int my_proc_id = Actor::self()->getPid();
300   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
301                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302   MPI_Status status;
303   //unknow size from the receiver pov
304   if (size <= 0.0) {
305     Request::probe(from, 0, MPI_COMM_WORLD, &status);
306     size = status.count;
307   }
308
309   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310
311   TRACE_smpi_comm_out(my_proc_id);
312   get_reqq_self()->push_back(request);
313
314   log_timed_action (action, clock);
315 }
316
317 static void action_test(const char* const* action)
318 {
319   CHECK_ACTION_PARAMS(action, 0, 0)
320   double clock = smpi_process()->simulated_elapsed();
321   MPI_Status status;
322
323   MPI_Request request = get_reqq_self()->back();
324   get_reqq_self()->pop_back();
325   //if request is null here, this may mean that a previous test has succeeded
326   //Different times in traced application and replayed version may lead to this
327   //In this case, ignore the extra calls.
328   if(request!=nullptr){
329     int my_proc_id = Actor::self()->getPid();
330     TRACE_smpi_testing_in(my_proc_id);
331
332     int flag = Request::test(&request, &status);
333
334     XBT_DEBUG("MPI_Test result: %d", flag);
335     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
336     get_reqq_self()->push_back(request);
337
338     TRACE_smpi_testing_out(my_proc_id);
339   }
340   log_timed_action (action, clock);
341 }
342
343 static void action_wait(const char *const *action){
344   CHECK_ACTION_PARAMS(action, 0, 0)
345   double clock = smpi_process()->simulated_elapsed();
346   MPI_Status status;
347
348   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
349       xbt_str_join_array(action," "));
350   MPI_Request request = get_reqq_self()->back();
351   get_reqq_self()->pop_back();
352
353   if (request==nullptr){
354     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
355     return;
356   }
357
358   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359
360   MPI_Group group = request->comm()->group();
361   int src_traced = group->rank(request->src());
362   int dst_traced = group->rank(request->dst());
363   int is_wait_for_receive = (request->flags() & RECV);
364   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365
366   Request::wait(&request, &status);
367
368   TRACE_smpi_comm_out(rank);
369   if (is_wait_for_receive)
370     TRACE_smpi_recv(src_traced, dst_traced, 0);
371   log_timed_action (action, clock);
372 }
373
374 static void action_waitall(const char *const *action){
375   CHECK_ACTION_PARAMS(action, 0, 0)
376   double clock = smpi_process()->simulated_elapsed();
377   const unsigned int count_requests = get_reqq_self()->size();
378
379   if (count_requests>0) {
380     MPI_Status status[count_requests];
381
382     int my_proc_id_traced = Actor::self()->getPid();
383     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
384                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
385     int recvs_snd[count_requests];
386     int recvs_rcv[count_requests];
387     for (unsigned int i = 0; i < count_requests; i++) {
388       const auto& req = (*get_reqq_self())[i];
389       if (req && (req->flags() & RECV)) {
390         recvs_snd[i] = req->src();
391         recvs_rcv[i] = req->dst();
392       } else
393         recvs_snd[i] = -100;
394    }
395    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396
397    for (unsigned i = 0; i < count_requests; i++) {
398      if (recvs_snd[i]!=-100)
399        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400    }
401    TRACE_smpi_comm_out(my_proc_id_traced);
402   }
403   log_timed_action (action, clock);
404 }
405
406 static void action_barrier(const char *const *action){
407   double clock = smpi_process()->simulated_elapsed();
408   int my_proc_id = Actor::self()->getPid();
409   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410
411   Colls::barrier(MPI_COMM_WORLD);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_bcast(const char *const *action)
418 {
419   CHECK_ACTION_PARAMS(action, 1, 2)
420   double size = parse_double(action[2]);
421   double clock = smpi_process()->simulated_elapsed();
422   int root     = (action[3]) ? atoi(action[3]) : 0;
423   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
424   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
425
426   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
427
428   int my_proc_id = Actor::self()->getPid();
429   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
430                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
431                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
432
433   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
434
435   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
436
437   TRACE_smpi_comm_out(my_proc_id);
438   log_timed_action (action, clock);
439 }
440
441 static void action_reduce(const char *const *action)
442 {
443   CHECK_ACTION_PARAMS(action, 2, 2)
444   double comm_size = parse_double(action[2]);
445   double comp_size = parse_double(action[3]);
446   double clock = smpi_process()->simulated_elapsed();
447   int root         = (action[4]) ? atoi(action[4]) : 0;
448
449   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
450
451   int my_proc_id = Actor::self()->getPid();
452   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
453                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
454                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
455
456   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
458   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
459   smpi_execute_flops(comp_size);
460
461   TRACE_smpi_comm_out(my_proc_id);
462   log_timed_action (action, clock);
463 }
464
465 static void action_allReduce(const char *const *action) {
466   CHECK_ACTION_PARAMS(action, 2, 1)
467   double comm_size = parse_double(action[2]);
468   double comp_size = parse_double(action[3]);
469
470   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
471
472   double clock = smpi_process()->simulated_elapsed();
473   int my_proc_id = Actor::self()->getPid();
474   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
475                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
476
477   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
480   smpi_execute_flops(comp_size);
481
482   TRACE_smpi_comm_out(my_proc_id);
483   log_timed_action (action, clock);
484 }
485
486 static void action_allToAll(const char *const *action) {
487   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
488   double clock = smpi_process()->simulated_elapsed();
489   int comm_size = MPI_COMM_WORLD->size();
490   int send_size = parse_double(action[2]);
491   int recv_size = parse_double(action[3]);
492   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
493   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
494
495   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
496   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
497
498   int my_proc_id = Actor::self()->getPid();
499   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
500                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
501                                                     encode_datatype(MPI_CURRENT_TYPE),
502                                                     encode_datatype(MPI_CURRENT_TYPE2)));
503
504   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
505
506   TRACE_smpi_comm_out(my_proc_id);
507   log_timed_action (action, clock);
508 }
509
510 static void action_gather(const char *const *action) {
511   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
512         0 gather 68 68 0 0 0
513       where:
514         1) 68 is the sendcounts
515         2) 68 is the recvcounts
516         3) 0 is the root node
517         4) 0 is the send datatype id, see decode_datatype()
518         5) 0 is the recv datatype id, see decode_datatype()
519   */
520   CHECK_ACTION_PARAMS(action, 2, 3)
521   double clock = smpi_process()->simulated_elapsed();
522   int comm_size = MPI_COMM_WORLD->size();
523   int send_size = parse_double(action[2]);
524   int recv_size = parse_double(action[3]);
525   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
526   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527
528   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
529   void *recv = nullptr;
530   int root   = (action[4]) ? atoi(action[4]) : 0;
531   int rank = MPI_COMM_WORLD->rank();
532
533   if(rank==root)
534     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
535
536   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
537                                                                         encode_datatype(MPI_CURRENT_TYPE),
538                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
539
540   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541
542   TRACE_smpi_comm_out(Actor::self()->getPid());
543   log_timed_action (action, clock);
544 }
545
546 static void action_scatter(const char* const* action)
547 {
548   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
549         0 gather 68 68 0 0 0
550       where:
551         1) 68 is the sendcounts
552         2) 68 is the recvcounts
553         3) 0 is the root node
554         4) 0 is the send datatype id, see decode_datatype()
555         5) 0 is the recv datatype id, see decode_datatype()
556   */
557   CHECK_ACTION_PARAMS(action, 2, 3)
558   double clock                   = smpi_process()->simulated_elapsed();
559   int comm_size                  = MPI_COMM_WORLD->size();
560   int send_size                  = parse_double(action[2]);
561   int recv_size                  = parse_double(action[3]);
562   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
563   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
564
565   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
566   void* recv = nullptr;
567   int root   = (action[4]) ? atoi(action[4]) : 0;
568   int rank = MPI_COMM_WORLD->rank();
569
570   if (rank == root)
571     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
572
573   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
574                                                                         encode_datatype(MPI_CURRENT_TYPE),
575                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
576
577   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578
579   TRACE_smpi_comm_out(Actor::self()->getPid());
580   log_timed_action(action, clock);
581 }
582
583 static void action_gatherv(const char *const *action) {
584   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
585        0 gather 68 68 10 10 10 0 0 0
586      where:
587        1) 68 is the sendcount
588        2) 68 10 10 10 is the recvcounts
589        3) 0 is the root node
590        4) 0 is the send datatype id, see decode_datatype()
591        5) 0 is the recv datatype id, see decode_datatype()
592   */
593   double clock = smpi_process()->simulated_elapsed();
594   int comm_size = MPI_COMM_WORLD->size();
595   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
596   int send_size = parse_double(action[2]);
597   int disps[comm_size];
598   int recvcounts[comm_size];
599   int recv_sum=0;
600
601   MPI_CURRENT_TYPE =
602       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
603   MPI_Datatype MPI_CURRENT_TYPE2{
604       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
605
606   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
607   void *recv = nullptr;
608   for(int i=0;i<comm_size;i++) {
609     recvcounts[i] = atoi(action[i+3]);
610     recv_sum=recv_sum+recvcounts[i];
611     disps[i]=0;
612   }
613
614   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
615   int rank = MPI_COMM_WORLD->rank();
616
617   if(rank==root)
618     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
619
620   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
621
622   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
623                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
624                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
625
626   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
627
628   TRACE_smpi_comm_out(Actor::self()->getPid());
629   log_timed_action (action, clock);
630 }
631
632 static void action_scatterv(const char* const* action)
633 {
634   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
635        0 gather 68 10 10 10 68 0 0 0
636      where:
637        1) 68 10 10 10 is the sendcounts
638        2) 68 is the recvcount
639        3) 0 is the root node
640        4) 0 is the send datatype id, see decode_datatype()
641        5) 0 is the recv datatype id, see decode_datatype()
642   */
643   double clock  = smpi_process()->simulated_elapsed();
644   int comm_size = MPI_COMM_WORLD->size();
645   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
646   int recv_size = parse_double(action[2 + comm_size]);
647   int disps[comm_size];
648   int sendcounts[comm_size];
649   int send_sum = 0;
650
651   MPI_CURRENT_TYPE =
652       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
653   MPI_Datatype MPI_CURRENT_TYPE2{
654       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
655
656   void* send = nullptr;
657   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
658   for (int i = 0; i < comm_size; i++) {
659     sendcounts[i] = atoi(action[i + 2]);
660     send_sum += sendcounts[i];
661     disps[i] = 0;
662   }
663
664   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
665   int rank = MPI_COMM_WORLD->rank();
666
667   if (rank == root)
668     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
669
670   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
671
672   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
673                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
674                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
675
676   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
677
678   TRACE_smpi_comm_out(Actor::self()->getPid());
679   log_timed_action(action, clock);
680 }
681
682 static void action_reducescatter(const char *const *action) {
683  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
684       0 reduceScatter 275427 275427 275427 204020 11346849 0
685     where:
686       1) The first four values after the name of the action declare the recvcounts array
687       2) The value 11346849 is the amount of instructions
688       3) The last value corresponds to the datatype, see decode_datatype().
689 */
690   double clock = smpi_process()->simulated_elapsed();
691   int comm_size = MPI_COMM_WORLD->size();
692   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
693   int comp_size = parse_double(action[2+comm_size]);
694   int recvcounts[comm_size];
695   int my_proc_id                     = Actor::self()->getPid();
696   int size = 0;
697   std::vector<int>* trace_recvcounts = new std::vector<int>;
698   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
699
700   for(int i=0;i<comm_size;i++) {
701     recvcounts[i] = atoi(action[i+2]);
702     trace_recvcounts->push_back(recvcounts[i]);
703     size+=recvcounts[i];
704   }
705
706   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
707                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
708                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
709                                                        encode_datatype(MPI_CURRENT_TYPE)));
710
711   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
712   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
713
714   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
715   smpi_execute_flops(comp_size);
716
717   TRACE_smpi_comm_out(my_proc_id);
718   log_timed_action (action, clock);
719 }
720
721 static void action_allgather(const char *const *action) {
722   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
723         0 allGather 275427 275427
724     where:
725         1) 275427 is the sendcount
726         2) 275427 is the recvcount
727         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
728   */
729   double clock = smpi_process()->simulated_elapsed();
730
731   CHECK_ACTION_PARAMS(action, 2, 2)
732   int sendcount=atoi(action[2]);
733   int recvcount=atoi(action[3]);
734
735   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
736   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
737
738   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
739   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
740
741   int my_proc_id = Actor::self()->getPid();
742
743   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
744                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
745                                                     encode_datatype(MPI_CURRENT_TYPE),
746                                                     encode_datatype(MPI_CURRENT_TYPE2)));
747
748   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
749
750   TRACE_smpi_comm_out(my_proc_id);
751   log_timed_action (action, clock);
752 }
753
754 static void action_allgatherv(const char *const *action) {
755   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
756         0 allGatherV 275427 275427 275427 275427 204020
757      where:
758         1) 275427 is the sendcount
759         2) The next four elements declare the recvcounts array
760         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
761   */
762   double clock = smpi_process()->simulated_elapsed();
763
764   int comm_size = MPI_COMM_WORLD->size();
765   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
766   int sendcount=atoi(action[2]);
767   int recvcounts[comm_size];
768   int disps[comm_size];
769   int recv_sum=0;
770
771   MPI_CURRENT_TYPE =
772       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
773   MPI_Datatype MPI_CURRENT_TYPE2{
774       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
775
776   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
777
778   for(int i=0;i<comm_size;i++) {
779     recvcounts[i] = atoi(action[i+3]);
780     recv_sum=recv_sum+recvcounts[i];
781     disps[i] = 0;
782   }
783   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
784
785   int my_proc_id = Actor::self()->getPid();
786
787   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
788
789   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
790                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
791                                                        encode_datatype(MPI_CURRENT_TYPE),
792                                                        encode_datatype(MPI_CURRENT_TYPE2)));
793
794   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
795                           MPI_COMM_WORLD);
796
797   TRACE_smpi_comm_out(my_proc_id);
798   log_timed_action (action, clock);
799 }
800
801 static void action_allToAllv(const char *const *action) {
802   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
803         0 allToAllV 100 1 7 10 12 100 1 70 10 5
804      where:
805         1) 100 is the size of the send buffer *sizeof(int),
806         2) 1 7 10 12 is the sendcounts array
807         3) 100*sizeof(int) is the size of the receiver buffer
808         4)  1 70 10 5 is the recvcounts array
809   */
810   double clock = smpi_process()->simulated_elapsed();
811
812   int comm_size = MPI_COMM_WORLD->size();
813   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
814   int send_size = 0;
815   int recv_size = 0;
816   int sendcounts[comm_size];
817   std::vector<int>* trace_sendcounts = new std::vector<int>;
818   int recvcounts[comm_size];
819   std::vector<int>* trace_recvcounts = new std::vector<int>;
820   int senddisps[comm_size];
821   int recvdisps[comm_size];
822
823   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
824                          ? decode_datatype(action[4 + 2 * comm_size])
825                          : MPI_DEFAULT_TYPE;
826   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
827                                      ? decode_datatype(action[5 + 2 * comm_size])
828                                      : MPI_DEFAULT_TYPE};
829
830   int send_buf_size=parse_double(action[2]);
831   int recv_buf_size=parse_double(action[3+comm_size]);
832   int my_proc_id = Actor::self()->getPid();
833   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
834   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
835
836   for(int i=0;i<comm_size;i++) {
837     sendcounts[i] = atoi(action[i+3]);
838     trace_sendcounts->push_back(sendcounts[i]);
839     send_size += sendcounts[i];
840     recvcounts[i] = atoi(action[i+4+comm_size]);
841     trace_recvcounts->push_back(recvcounts[i]);
842     recv_size += recvcounts[i];
843     senddisps[i] = 0;
844     recvdisps[i] = 0;
845   }
846
847   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
848                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
849                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
850                                                        encode_datatype(MPI_CURRENT_TYPE2)));
851
852   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
853                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
854
855   TRACE_smpi_comm_out(my_proc_id);
856   log_timed_action (action, clock);
857 }
858
859 }} // namespace simgrid::smpi
860
861 /** @brief Only initialize the replay, don't do it for real */
862 void smpi_replay_init(int* argc, char*** argv)
863 {
864   simgrid::smpi::Process::init(argc, argv);
865   smpi_process()->mark_as_initialized();
866   smpi_process()->set_replaying(true);
867
868   int my_proc_id = Actor::self()->getPid();
869   TRACE_smpi_init(my_proc_id);
870   TRACE_smpi_computing_init(my_proc_id);
871   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
872   TRACE_smpi_comm_out(my_proc_id);
873   xbt_replay_action_register("init",       simgrid::smpi::action_init);
874   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
875   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
876   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
877   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
878   xbt_replay_action_register("send",       simgrid::smpi::action_send);
879   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
880   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
881   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
882   xbt_replay_action_register("test",       simgrid::smpi::action_test);
883   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
884   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
885   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
886   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
887   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
888   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
889   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
890   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
891   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
892   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
893   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
894   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
895   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
896   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
897   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
898   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
899
900   //if we have a delayed start, sleep here.
901   if(*argc>2){
902     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
903     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
904     smpi_execute_flops(value);
905   } else {
906     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
907     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
908     smpi_execute_flops(0.0);
909   }
910 }
911
912 /** @brief actually run the replay after initialization */
913 void smpi_replay_main(int* argc, char*** argv)
914 {
915   simgrid::xbt::replay_runner(*argc, *argv);
916
917   /* and now, finalize everything */
918   /* One active process will stop. Decrease the counter*/
919   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
920   if (not get_reqq_self()->empty()) {
921     unsigned int count_requests=get_reqq_self()->size();
922     MPI_Request requests[count_requests];
923     MPI_Status status[count_requests];
924     unsigned int i=0;
925
926     for (auto const& req : *get_reqq_self()) {
927       requests[i] = req;
928       i++;
929     }
930     simgrid::smpi::Request::waitall(count_requests, requests, status);
931   }
932   delete get_reqq_self();
933   active_processes--;
934
935   if(active_processes==0){
936     /* Last process alive speaking: end the simulated timer */
937     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
938     xbt_free(sendbuffer);
939     xbt_free(recvbuffer);
940   }
941
942   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
943
944   smpi_process()->finalize();
945
946   TRACE_smpi_comm_out(Actor::self()->getPid());
947   TRACE_smpi_finalize(Actor::self()->getPid());
948 }
949
950 /** @brief chain a replay initialization and a replay start */
951 void smpi_replay_run(int* argc, char*** argv)
952 {
953   smpi_replay_init(argc, argv);
954   smpi_replay_main(argc, argv);
955 }