Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Use the right buffer (c&p error?)
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 class ReplayActionArg {
36   ReplayActionArg() {}
37 };
38
39 static void log_timed_action (const char *const *action, double clock){
40   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41     char *name = xbt_str_join_array(action, " ");
42     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43     xbt_free(name);
44   }
45 }
46
47 static std::vector<MPI_Request>* get_reqq_self()
48 {
49   return reqq.at(Actor::self()->getPid());
50 }
51
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
53 {
54    reqq.insert({Actor::self()->getPid(), mpi_request});
55 }
56
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
59 {
60   if (not smpi_process()->replaying())
61     return xbt_malloc(size);
62   if (sendbuffer_size<size){
63     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64     sendbuffer_size=size;
65   }
66   return sendbuffer;
67 }
68
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71   if (not smpi_process()->replaying())
72     return xbt_malloc(size);
73   if (recvbuffer_size<size){
74     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75     recvbuffer_size=size;
76   }
77   return recvbuffer;
78 }
79
80 void smpi_free_tmp_buffer(void* buf){
81   if (not smpi_process()->replaying())
82     xbt_free(buf);
83 }
84
85 /* Helper function */
86 static double parse_double(const char *string)
87 {
88   char *endptr;
89   double value = strtod(string, &endptr);
90   if (*endptr != '\0')
91     THROWF(unknown_error, 0, "%s is not a double", string);
92   return value;
93 }
94
95
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
98 {
99   return simgrid::smpi::Datatype::decode(action);
100 }
101
102 const char* encode_datatype(MPI_Datatype datatype)
103 {
104   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
105     return "-1";
106
107   return datatype->encode();
108 }
109
110 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
111     int i=0;\
112     while(action[i]!=nullptr)\
113      i++;\
114     if(i<mandatory+2)                                           \
115     THROWF(arg_error, 0, "%s replay failed.\n" \
116           "%d items were given on the line. First two should be process_id and action.  " \
117           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
118           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
119   }
120
121 namespace simgrid {
122 namespace smpi {
123
124 static void action_init(const char *const *action)
125 {
126   XBT_DEBUG("Initialize the counters");
127   CHECK_ACTION_PARAMS(action, 0, 1)
128   if(action[2])
129     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
130   else
131     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
132
133   /* start a simulated timer */
134   smpi_process()->simulated_start();
135   /*initialize the number of active processes */
136   active_processes = smpi_process_count();
137
138   set_reqq_self(new std::vector<MPI_Request>);
139 }
140
141 static void action_finalize(const char *const *action)
142 {
143   /* Nothing to do */
144 }
145
146 static void action_comm_size(const char *const *action)
147 {
148   communicator_size = parse_double(action[2]);
149   log_timed_action (action, smpi_process()->simulated_elapsed());
150 }
151
152 static void action_comm_split(const char *const *action)
153 {
154   log_timed_action (action, smpi_process()->simulated_elapsed());
155 }
156
157 static void action_comm_dup(const char *const *action)
158 {
159   log_timed_action (action, smpi_process()->simulated_elapsed());
160 }
161
162 static void action_compute(const char *const *action)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   double clock = smpi_process()->simulated_elapsed();
166   double flops= parse_double(action[2]);
167   int my_proc_id = Actor::self()->getPid();
168
169   TRACE_smpi_computing_in(my_proc_id, flops);
170   smpi_execute_flops(flops);
171   TRACE_smpi_computing_out(my_proc_id);
172
173   log_timed_action (action, clock);
174 }
175
176 static void action_send(const char *const *action)
177 {
178   CHECK_ACTION_PARAMS(action, 2, 1)
179   int to       = std::stoi(action[2]);
180   double size=parse_double(action[3]);
181   double clock = smpi_process()->simulated_elapsed();
182
183   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
184
185   int my_proc_id = Actor::self()->getPid();
186   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
187
188   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189                      new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
190   if (not TRACE_smpi_view_internals())
191     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
192
193   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
194
195   TRACE_smpi_comm_out(my_proc_id);
196
197   log_timed_action(action, clock);
198 }
199
200 static void action_Isend(const char *const *action)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   int to       = std::stoi(action[2]);
204   double size=parse_double(action[3]);
205   double clock = smpi_process()->simulated_elapsed();
206
207   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
208
209   int my_proc_id = Actor::self()->getPid();
210   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
211   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212                      new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
213   if (not TRACE_smpi_view_internals())
214     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
215
216   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
217
218   TRACE_smpi_comm_out(my_proc_id);
219
220   get_reqq_self()->push_back(request);
221
222   log_timed_action (action, clock);
223 }
224
225 static void action_recv(const char *const *action) {
226   CHECK_ACTION_PARAMS(action, 2, 1)
227   int from     = std::stoi(action[2]);
228   double size=parse_double(action[3]);
229   double clock = smpi_process()->simulated_elapsed();
230   MPI_Status status;
231
232   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
233
234   int my_proc_id = Actor::self()->getPid();
235   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
236
237   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
238                      new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
239
240   //unknown size from the receiver point of view
241   if (size <= 0.0) {
242     Request::probe(from, 0, MPI_COMM_WORLD, &status);
243     size=status.count;
244   }
245
246   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
247
248   TRACE_smpi_comm_out(my_proc_id);
249   if (not TRACE_smpi_view_internals()) {
250     TRACE_smpi_recv(src_traced, my_proc_id, 0);
251   }
252
253   log_timed_action (action, clock);
254 }
255
256 static void action_Irecv(const char *const *action)
257 {
258   CHECK_ACTION_PARAMS(action, 2, 1)
259   int from     = std::stoi(action[2]);
260   double size=parse_double(action[3]);
261   double clock = smpi_process()->simulated_elapsed();
262
263   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
264
265   int my_proc_id = Actor::self()->getPid();
266   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
267                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
268   MPI_Status status;
269   //unknow size from the receiver pov
270   if (size <= 0.0) {
271     Request::probe(from, 0, MPI_COMM_WORLD, &status);
272     size = status.count;
273   }
274
275   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
276
277   TRACE_smpi_comm_out(my_proc_id);
278   get_reqq_self()->push_back(request);
279
280   log_timed_action (action, clock);
281 }
282
283 static void action_test(const char* const* action)
284 {
285   CHECK_ACTION_PARAMS(action, 0, 0)
286   double clock = smpi_process()->simulated_elapsed();
287   MPI_Status status;
288
289   MPI_Request request = get_reqq_self()->back();
290   get_reqq_self()->pop_back();
291   //if request is null here, this may mean that a previous test has succeeded
292   //Different times in traced application and replayed version may lead to this
293   //In this case, ignore the extra calls.
294   if(request!=nullptr){
295     int my_proc_id = Actor::self()->getPid();
296     TRACE_smpi_testing_in(my_proc_id);
297
298     int flag = Request::test(&request, &status);
299
300     XBT_DEBUG("MPI_Test result: %d", flag);
301     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
302     get_reqq_self()->push_back(request);
303
304     TRACE_smpi_testing_out(my_proc_id);
305   }
306   log_timed_action (action, clock);
307 }
308
309 static void action_wait(const char *const *action){
310   CHECK_ACTION_PARAMS(action, 0, 0)
311   double clock = smpi_process()->simulated_elapsed();
312   MPI_Status status;
313
314   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
315       xbt_str_join_array(action," "));
316   MPI_Request request = get_reqq_self()->back();
317   get_reqq_self()->pop_back();
318
319   if (request==nullptr){
320     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
321     return;
322   }
323
324   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
325
326   MPI_Group group = request->comm()->group();
327   int src_traced = group->rank(request->src());
328   int dst_traced = group->rank(request->dst());
329   int is_wait_for_receive = (request->flags() & RECV);
330   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
331
332   Request::wait(&request, &status);
333
334   TRACE_smpi_comm_out(rank);
335   if (is_wait_for_receive)
336     TRACE_smpi_recv(src_traced, dst_traced, 0);
337   log_timed_action (action, clock);
338 }
339
340 static void action_waitall(const char *const *action){
341   CHECK_ACTION_PARAMS(action, 0, 0)
342   double clock = smpi_process()->simulated_elapsed();
343   const unsigned int count_requests = get_reqq_self()->size();
344
345   if (count_requests>0) {
346     MPI_Status status[count_requests];
347
348     int my_proc_id_traced = Actor::self()->getPid();
349     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
350                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
351     int recvs_snd[count_requests];
352     int recvs_rcv[count_requests];
353     for (unsigned int i = 0; i < count_requests; i++) {
354       const auto& req = (*get_reqq_self())[i];
355       if (req && (req->flags() & RECV)) {
356         recvs_snd[i] = req->src();
357         recvs_rcv[i] = req->dst();
358       } else
359         recvs_snd[i] = -100;
360    }
361    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
362
363    for (unsigned i = 0; i < count_requests; i++) {
364      if (recvs_snd[i]!=-100)
365        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
366    }
367    TRACE_smpi_comm_out(my_proc_id_traced);
368   }
369   log_timed_action (action, clock);
370 }
371
372 static void action_barrier(const char *const *action){
373   double clock = smpi_process()->simulated_elapsed();
374   int my_proc_id = Actor::self()->getPid();
375   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
376
377   Colls::barrier(MPI_COMM_WORLD);
378
379   TRACE_smpi_comm_out(my_proc_id);
380   log_timed_action (action, clock);
381 }
382
383 static void action_bcast(const char *const *action)
384 {
385   CHECK_ACTION_PARAMS(action, 1, 2)
386   double size = parse_double(action[2]);
387   double clock = smpi_process()->simulated_elapsed();
388   int root     = (action[3]) ? std::stoi(action[3]) : 0;
389   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
390   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
391
392   int my_proc_id = Actor::self()->getPid();
393   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
395                                                     -1, MPI_CURRENT_TYPE->encode(), ""));
396
397   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
398
399   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
400
401   TRACE_smpi_comm_out(my_proc_id);
402   log_timed_action (action, clock);
403 }
404
405 static void action_reduce(const char *const *action)
406 {
407   CHECK_ACTION_PARAMS(action, 2, 2)
408   double comm_size = parse_double(action[2]);
409   double comp_size = parse_double(action[3]);
410   double clock = smpi_process()->simulated_elapsed();
411   int root         = (action[4]) ? std::stoi(action[4]) : 0;
412
413   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
414
415   int my_proc_id = Actor::self()->getPid();
416   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
417                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
418                                                     comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
419
420   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
422   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
423   smpi_execute_flops(comp_size);
424
425   TRACE_smpi_comm_out(my_proc_id);
426   log_timed_action (action, clock);
427 }
428
429 static void action_allReduce(const char *const *action) {
430   CHECK_ACTION_PARAMS(action, 2, 1)
431   double comm_size = parse_double(action[2]);
432   double comp_size = parse_double(action[3]);
433
434   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
435
436   double clock = smpi_process()->simulated_elapsed();
437   int my_proc_id = Actor::self()->getPid();
438   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
439                                                                               MPI_CURRENT_TYPE->encode(), ""));
440
441   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
442   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
443   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
444   smpi_execute_flops(comp_size);
445
446   TRACE_smpi_comm_out(my_proc_id);
447   log_timed_action (action, clock);
448 }
449
450 static void action_allToAll(const char *const *action) {
451   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
452   double clock = smpi_process()->simulated_elapsed();
453   int comm_size = MPI_COMM_WORLD->size();
454   int send_size = parse_double(action[2]);
455   int recv_size = parse_double(action[3]);
456   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
457   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
458
459   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
460   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
461
462   int my_proc_id = Actor::self()->getPid();
463   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
464                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
465                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
466
467   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
468
469   TRACE_smpi_comm_out(my_proc_id);
470   log_timed_action (action, clock);
471 }
472
473 static void action_gather(const char *const *action) {
474   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
475         0 gather 68 68 0 0 0
476       where:
477         1) 68 is the sendcounts
478         2) 68 is the recvcounts
479         3) 0 is the root node
480         4) 0 is the send datatype id, see decode_datatype()
481         5) 0 is the recv datatype id, see decode_datatype()
482   */
483   CHECK_ACTION_PARAMS(action, 2, 3)
484   double clock = smpi_process()->simulated_elapsed();
485   int comm_size = MPI_COMM_WORLD->size();
486   int send_size = parse_double(action[2]);
487   int recv_size = parse_double(action[3]);
488   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
489   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
490
491   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
492   void *recv = nullptr;
493   int root   = (action[4]) ? std::stoi(action[4]) : 0;
494   int rank = MPI_COMM_WORLD->rank();
495
496   if(rank==root)
497     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
498
499   TRACE_smpi_comm_in(rank, __FUNCTION__,
500                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
501                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
502
503   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(Actor::self()->getPid());
506   log_timed_action (action, clock);
507 }
508
509 static void action_scatter(const char* const* action)
510 {
511   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
512         0 gather 68 68 0 0 0
513       where:
514         1) 68 is the sendcounts
515         2) 68 is the recvcounts
516         3) 0 is the root node
517         4) 0 is the send datatype id, see decode_datatype()
518         5) 0 is the recv datatype id, see decode_datatype()
519   */
520   CHECK_ACTION_PARAMS(action, 2, 3)
521   double clock                   = smpi_process()->simulated_elapsed();
522   int comm_size                  = MPI_COMM_WORLD->size();
523   int send_size                  = parse_double(action[2]);
524   int recv_size                  = parse_double(action[3]);
525   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
526   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527
528   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
529   void* recv = nullptr;
530   int root   = (action[4]) ? std::stoi(action[4]) : 0;
531   int rank = MPI_COMM_WORLD->rank();
532
533   if (rank == root)
534     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
535
536   TRACE_smpi_comm_in(rank, __FUNCTION__,
537                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
538                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
539
540   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541
542   TRACE_smpi_comm_out(Actor::self()->getPid());
543   log_timed_action(action, clock);
544 }
545
546 static void action_gatherv(const char *const *action) {
547   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
548        0 gather 68 68 10 10 10 0 0 0
549      where:
550        1) 68 is the sendcount
551        2) 68 10 10 10 is the recvcounts
552        3) 0 is the root node
553        4) 0 is the send datatype id, see decode_datatype()
554        5) 0 is the recv datatype id, see decode_datatype()
555   */
556   double clock = smpi_process()->simulated_elapsed();
557   int comm_size = MPI_COMM_WORLD->size();
558   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
559   int send_size = parse_double(action[2]);
560   std::vector<int> disps(comm_size, 0);
561   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
562
563   MPI_Datatype MPI_CURRENT_TYPE =
564       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
565   MPI_Datatype MPI_CURRENT_TYPE2{
566       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
567
568   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
569   void *recv = nullptr;
570   for(int i=0;i<comm_size;i++) {
571     (*recvcounts)[i] = std::stoi(action[i + 3]);
572   }
573   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
574
575   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
576   int rank = MPI_COMM_WORLD->rank();
577
578   if(rank==root)
579     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
580
581   TRACE_smpi_comm_in(rank, __FUNCTION__,
582                      new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
583                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
584
585   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
586                  MPI_COMM_WORLD);
587
588   TRACE_smpi_comm_out(Actor::self()->getPid());
589   log_timed_action (action, clock);
590 }
591
592 static void action_scatterv(const char* const* action)
593 {
594   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
595        0 gather 68 10 10 10 68 0 0 0
596      where:
597        1) 68 10 10 10 is the sendcounts
598        2) 68 is the recvcount
599        3) 0 is the root node
600        4) 0 is the send datatype id, see decode_datatype()
601        5) 0 is the recv datatype id, see decode_datatype()
602   */
603   double clock  = smpi_process()->simulated_elapsed();
604   int comm_size = MPI_COMM_WORLD->size();
605   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
606   int recv_size = parse_double(action[2 + comm_size]);
607   std::vector<int> disps(comm_size, 0);
608   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
609
610   MPI_Datatype MPI_CURRENT_TYPE =
611       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
612   MPI_Datatype MPI_CURRENT_TYPE2{
613       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
614
615   void* send = nullptr;
616   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
617   for (int i = 0; i < comm_size; i++) {
618     (*sendcounts)[i] = std::stoi(action[i + 2]);
619   }
620   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
621
622   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
623   int rank = MPI_COMM_WORLD->rank();
624
625   if (rank == root)
626     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
627
628   TRACE_smpi_comm_in(rank, __FUNCTION__,
629                      new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
630                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
631
632   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
633                   MPI_COMM_WORLD);
634
635   TRACE_smpi_comm_out(Actor::self()->getPid());
636   log_timed_action(action, clock);
637 }
638
639 static void action_reducescatter(const char *const *action) {
640  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
641       0 reduceScatter 275427 275427 275427 204020 11346849 0
642     where:
643       1) The first four values after the name of the action declare the recvcounts array
644       2) The value 11346849 is the amount of instructions
645       3) The last value corresponds to the datatype, see decode_datatype().
646 */
647   double clock = smpi_process()->simulated_elapsed();
648   int comm_size = MPI_COMM_WORLD->size();
649   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
650   int comp_size = parse_double(action[2+comm_size]);
651   int my_proc_id                     = Actor::self()->getPid();
652   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
653   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
654
655   for(int i=0;i<comm_size;i++) {
656     recvcounts->push_back(std::stoi(action[i + 2]));
657   }
658   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
659
660   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
661                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
662                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
663                                                        MPI_CURRENT_TYPE->encode()));
664
665   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
666   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
667
668   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
669   smpi_execute_flops(comp_size);
670
671   TRACE_smpi_comm_out(my_proc_id);
672   log_timed_action (action, clock);
673 }
674
675 static void action_allgather(const char *const *action) {
676   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
677         0 allGather 275427 275427
678     where:
679         1) 275427 is the sendcount
680         2) 275427 is the recvcount
681         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
682   */
683   double clock = smpi_process()->simulated_elapsed();
684
685   CHECK_ACTION_PARAMS(action, 2, 2)
686   int sendcount = std::stoi(action[2]);
687   int recvcount = std::stoi(action[3]);
688
689   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
690   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
691
692   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
693   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
694
695   int my_proc_id = Actor::self()->getPid();
696
697   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
698                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
699                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
700
701   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
702
703   TRACE_smpi_comm_out(my_proc_id);
704   log_timed_action (action, clock);
705 }
706
707 static void action_allgatherv(const char *const *action) {
708   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
709         0 allGatherV 275427 275427 275427 275427 204020
710      where:
711         1) 275427 is the sendcount
712         2) The next four elements declare the recvcounts array
713         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
714   */
715   double clock = smpi_process()->simulated_elapsed();
716
717   int comm_size = MPI_COMM_WORLD->size();
718   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
719   int sendcount = std::stoi(action[2]);
720   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
721   std::vector<int> disps(comm_size, 0);
722
723   int datatype_index = 0, disp_index = 0;
724   if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
725     datatype_index = 3 + comm_size;
726     disp_index     = datatype_index + 1;
727   } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
728     datatype_index = -1;
729     disp_index     = 3 + comm_size;
730   } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
731     datatype_index = 3 + comm_size;
732   }
733
734   if (disp_index != 0) {
735     std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
736   }
737
738   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
739   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
740
741   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
742
743   for(int i=0;i<comm_size;i++) {
744     (*recvcounts)[i] = std::stoi(action[i + 3]);
745   }
746   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
747   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
748
749   int my_proc_id = Actor::self()->getPid();
750
751   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
752                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
753                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
754
755   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
756                     MPI_COMM_WORLD);
757
758   TRACE_smpi_comm_out(my_proc_id);
759   log_timed_action (action, clock);
760 }
761
762 static void action_allToAllv(const char *const *action) {
763   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
764         0 allToAllV 100 1 7 10 12 100 1 70 10 5
765      where:
766         1) 100 is the size of the send buffer *sizeof(int),
767         2) 1 7 10 12 is the sendcounts array
768         3) 100*sizeof(int) is the size of the receiver buffer
769         4)  1 70 10 5 is the recvcounts array
770   */
771   double clock = smpi_process()->simulated_elapsed();
772
773   int comm_size = MPI_COMM_WORLD->size();
774   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
775   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
776   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
777   std::vector<int> senddisps(comm_size, 0);
778   std::vector<int> recvdisps(comm_size, 0);
779
780   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
781                                       ? decode_datatype(action[4 + 2 * comm_size])
782                                       : MPI_DEFAULT_TYPE;
783   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
784                                      ? decode_datatype(action[5 + 2 * comm_size])
785                                      : MPI_DEFAULT_TYPE};
786
787   int send_buf_size=parse_double(action[2]);
788   int recv_buf_size=parse_double(action[3+comm_size]);
789   int my_proc_id = Actor::self()->getPid();
790   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
791   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
792
793   for(int i=0;i<comm_size;i++) {
794     (*sendcounts)[i] = std::stoi(action[3 + i]);
795     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
796   }
797   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
798   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
799
800   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
801                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
802                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
803
804   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
805                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
806
807   TRACE_smpi_comm_out(my_proc_id);
808   log_timed_action (action, clock);
809 }
810
811 }} // namespace simgrid::smpi
812
813 /** @brief Only initialize the replay, don't do it for real */
814 void smpi_replay_init(int* argc, char*** argv)
815 {
816   simgrid::smpi::Process::init(argc, argv);
817   smpi_process()->mark_as_initialized();
818   smpi_process()->set_replaying(true);
819
820   int my_proc_id = Actor::self()->getPid();
821   TRACE_smpi_init(my_proc_id);
822   TRACE_smpi_computing_init(my_proc_id);
823   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
824   TRACE_smpi_comm_out(my_proc_id);
825   xbt_replay_action_register("init",       simgrid::smpi::action_init);
826   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
827   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
828   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
829   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
830   xbt_replay_action_register("send",       simgrid::smpi::action_send);
831   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
832   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
833   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
834   xbt_replay_action_register("test",       simgrid::smpi::action_test);
835   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
836   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
837   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
838   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
839   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
840   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
841   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
842   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
843   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
844   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
845   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
846   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
847   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
848   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
849   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
850   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
851
852   //if we have a delayed start, sleep here.
853   if(*argc>2){
854     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
855     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
856     smpi_execute_flops(value);
857   } else {
858     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
859     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
860     smpi_execute_flops(0.0);
861   }
862 }
863
864 /** @brief actually run the replay after initialization */
865 void smpi_replay_main(int* argc, char*** argv)
866 {
867   simgrid::xbt::replay_runner(*argc, *argv);
868
869   /* and now, finalize everything */
870   /* One active process will stop. Decrease the counter*/
871   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
872   if (not get_reqq_self()->empty()) {
873     unsigned int count_requests=get_reqq_self()->size();
874     MPI_Request requests[count_requests];
875     MPI_Status status[count_requests];
876     unsigned int i=0;
877
878     for (auto const& req : *get_reqq_self()) {
879       requests[i] = req;
880       i++;
881     }
882     simgrid::smpi::Request::waitall(count_requests, requests, status);
883   }
884   delete get_reqq_self();
885   active_processes--;
886
887   if(active_processes==0){
888     /* Last process alive speaking: end the simulated timer */
889     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
890     xbt_free(sendbuffer);
891     xbt_free(recvbuffer);
892   }
893
894   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
895
896   smpi_process()->finalize();
897
898   TRACE_smpi_comm_out(Actor::self()->getPid());
899   TRACE_smpi_finalize(Actor::self()->getPid());
900 }
901
902 /** @brief chain a replay initialization and a replay start */
903 void smpi_replay_run(int* argc, char*** argv)
904 {
905   smpi_replay_init(argc, argv);
906   smpi_replay_main(argc, argv);
907 }