Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
676685c42a05b2e6961fb333a32f01d5b89bbe79
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
31     int i=0;\
32     while(action[i]!=nullptr)\
33      i++;\
34     if(i<mandatory+2)                                           \
35     THROWF(arg_error, 0, "%s replay failed.\n" \
36           "%d items were given on the line. First two should be process_id and action.  " \
37           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
38           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
39   }
40
41 class ReplayActionArg {
42   ReplayActionArg() {}
43 };
44
45 static void log_timed_action (const char *const *action, double clock){
46   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
47     char *name = xbt_str_join_array(action, " ");
48     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
49     xbt_free(name);
50   }
51 }
52
53 static std::vector<MPI_Request>* get_reqq_self()
54 {
55   return reqq.at(Actor::self()->getPid());
56 }
57
58 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
59 {
60    reqq.insert({Actor::self()->getPid(), mpi_request});
61 }
62
63 /* Helper function */
64 static double parse_double(const char *string)
65 {
66   char *endptr;
67   double value = strtod(string, &endptr);
68   if (*endptr != '\0')
69     THROWF(unknown_error, 0, "%s is not a double", string);
70   return value;
71 }
72
73
74 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
75 static MPI_Datatype decode_datatype(const char *const action)
76 {
77   return simgrid::smpi::Datatype::decode(action);
78 }
79
80 const char* encode_datatype(MPI_Datatype datatype)
81 {
82   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
83     return "-1";
84
85   return datatype->encode();
86 }
87
88 namespace simgrid {
89 namespace smpi {
90
91 static void action_init(const char *const *action)
92 {
93   XBT_DEBUG("Initialize the counters");
94   CHECK_ACTION_PARAMS(action, 0, 1)
95   if(action[2])
96     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
97   else
98     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
99
100   /* start a simulated timer */
101   smpi_process()->simulated_start();
102   /*initialize the number of active processes */
103   active_processes = smpi_process_count();
104
105   set_reqq_self(new std::vector<MPI_Request>);
106 }
107
108 static void action_finalize(const char *const *action)
109 {
110   /* Nothing to do */
111 }
112
113 static void action_comm_size(const char *const *action)
114 {
115   communicator_size = parse_double(action[2]);
116   log_timed_action (action, smpi_process()->simulated_elapsed());
117 }
118
119 static void action_comm_split(const char *const *action)
120 {
121   log_timed_action (action, smpi_process()->simulated_elapsed());
122 }
123
124 static void action_comm_dup(const char *const *action)
125 {
126   log_timed_action (action, smpi_process()->simulated_elapsed());
127 }
128
129 static void action_compute(const char *const *action)
130 {
131   CHECK_ACTION_PARAMS(action, 1, 0)
132   double clock = smpi_process()->simulated_elapsed();
133   double flops= parse_double(action[2]);
134   int my_proc_id = Actor::self()->getPid();
135
136   TRACE_smpi_computing_in(my_proc_id, flops);
137   smpi_execute_flops(flops);
138   TRACE_smpi_computing_out(my_proc_id);
139
140   log_timed_action (action, clock);
141 }
142
143 static void action_send(const char *const *action)
144 {
145   CHECK_ACTION_PARAMS(action, 2, 1)
146   int to       = std::stoi(action[2]);
147   double size=parse_double(action[3]);
148   double clock = smpi_process()->simulated_elapsed();
149
150   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
151
152   int my_proc_id = Actor::self()->getPid();
153   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
154
155   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
156                      new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
157   if (not TRACE_smpi_view_internals())
158     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
159
160   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
161
162   TRACE_smpi_comm_out(my_proc_id);
163
164   log_timed_action(action, clock);
165 }
166
167 static void action_Isend(const char *const *action)
168 {
169   CHECK_ACTION_PARAMS(action, 2, 1)
170   int to       = std::stoi(action[2]);
171   double size=parse_double(action[3]);
172   double clock = smpi_process()->simulated_elapsed();
173
174   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
175
176   int my_proc_id = Actor::self()->getPid();
177   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
178   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
179                      new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
180   if (not TRACE_smpi_view_internals())
181     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
182
183   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
184
185   TRACE_smpi_comm_out(my_proc_id);
186
187   get_reqq_self()->push_back(request);
188
189   log_timed_action (action, clock);
190 }
191
192 static void action_recv(const char *const *action) {
193   CHECK_ACTION_PARAMS(action, 2, 1)
194   int from     = std::stoi(action[2]);
195   double size=parse_double(action[3]);
196   double clock = smpi_process()->simulated_elapsed();
197   MPI_Status status;
198
199   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
200
201   int my_proc_id = Actor::self()->getPid();
202   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
203
204   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
205                      new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
206
207   //unknown size from the receiver point of view
208   if (size <= 0.0) {
209     Request::probe(from, 0, MPI_COMM_WORLD, &status);
210     size=status.count;
211   }
212
213   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
214
215   TRACE_smpi_comm_out(my_proc_id);
216   if (not TRACE_smpi_view_internals()) {
217     TRACE_smpi_recv(src_traced, my_proc_id, 0);
218   }
219
220   log_timed_action (action, clock);
221 }
222
223 static void action_Irecv(const char *const *action)
224 {
225   CHECK_ACTION_PARAMS(action, 2, 1)
226   int from     = std::stoi(action[2]);
227   double size=parse_double(action[3]);
228   double clock = smpi_process()->simulated_elapsed();
229
230   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
231
232   int my_proc_id = Actor::self()->getPid();
233   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
234                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
235   MPI_Status status;
236   //unknow size from the receiver pov
237   if (size <= 0.0) {
238     Request::probe(from, 0, MPI_COMM_WORLD, &status);
239     size = status.count;
240   }
241
242   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
243
244   TRACE_smpi_comm_out(my_proc_id);
245   get_reqq_self()->push_back(request);
246
247   log_timed_action (action, clock);
248 }
249
250 static void action_test(const char* const* action)
251 {
252   CHECK_ACTION_PARAMS(action, 0, 0)
253   double clock = smpi_process()->simulated_elapsed();
254   MPI_Status status;
255
256   MPI_Request request = get_reqq_self()->back();
257   get_reqq_self()->pop_back();
258   //if request is null here, this may mean that a previous test has succeeded
259   //Different times in traced application and replayed version may lead to this
260   //In this case, ignore the extra calls.
261   if(request!=nullptr){
262     int my_proc_id = Actor::self()->getPid();
263     TRACE_smpi_testing_in(my_proc_id);
264
265     int flag = Request::test(&request, &status);
266
267     XBT_DEBUG("MPI_Test result: %d", flag);
268     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
269     get_reqq_self()->push_back(request);
270
271     TRACE_smpi_testing_out(my_proc_id);
272   }
273   log_timed_action (action, clock);
274 }
275
276 static void action_wait(const char *const *action){
277   CHECK_ACTION_PARAMS(action, 0, 0)
278   double clock = smpi_process()->simulated_elapsed();
279   MPI_Status status;
280
281   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
282       xbt_str_join_array(action," "));
283   MPI_Request request = get_reqq_self()->back();
284   get_reqq_self()->pop_back();
285
286   if (request==nullptr){
287     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
288     return;
289   }
290
291   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
292
293   MPI_Group group = request->comm()->group();
294   int src_traced = group->rank(request->src());
295   int dst_traced = group->rank(request->dst());
296   int is_wait_for_receive = (request->flags() & RECV);
297   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
298
299   Request::wait(&request, &status);
300
301   TRACE_smpi_comm_out(rank);
302   if (is_wait_for_receive)
303     TRACE_smpi_recv(src_traced, dst_traced, 0);
304   log_timed_action (action, clock);
305 }
306
307 static void action_waitall(const char *const *action){
308   CHECK_ACTION_PARAMS(action, 0, 0)
309   double clock = smpi_process()->simulated_elapsed();
310   const unsigned int count_requests = get_reqq_self()->size();
311
312   if (count_requests>0) {
313     MPI_Status status[count_requests];
314
315     int my_proc_id_traced = Actor::self()->getPid();
316     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
317                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
318     int recvs_snd[count_requests];
319     int recvs_rcv[count_requests];
320     for (unsigned int i = 0; i < count_requests; i++) {
321       const auto& req = (*get_reqq_self())[i];
322       if (req && (req->flags() & RECV)) {
323         recvs_snd[i] = req->src();
324         recvs_rcv[i] = req->dst();
325       } else
326         recvs_snd[i] = -100;
327    }
328    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
329
330    for (unsigned i = 0; i < count_requests; i++) {
331      if (recvs_snd[i]!=-100)
332        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
333    }
334    TRACE_smpi_comm_out(my_proc_id_traced);
335   }
336   log_timed_action (action, clock);
337 }
338
339 static void action_barrier(const char *const *action){
340   double clock = smpi_process()->simulated_elapsed();
341   int my_proc_id = Actor::self()->getPid();
342   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
343
344   Colls::barrier(MPI_COMM_WORLD);
345
346   TRACE_smpi_comm_out(my_proc_id);
347   log_timed_action (action, clock);
348 }
349
350 static void action_bcast(const char *const *action)
351 {
352   CHECK_ACTION_PARAMS(action, 1, 2)
353   double size = parse_double(action[2]);
354   double clock = smpi_process()->simulated_elapsed();
355   int root     = (action[3]) ? std::stoi(action[3]) : 0;
356   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
357   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
358
359   int my_proc_id = Actor::self()->getPid();
360   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
361                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
362                                                     -1, MPI_CURRENT_TYPE->encode(), ""));
363
364   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
365
366   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
367
368   TRACE_smpi_comm_out(my_proc_id);
369   log_timed_action (action, clock);
370 }
371
372 static void action_reduce(const char *const *action)
373 {
374   CHECK_ACTION_PARAMS(action, 2, 2)
375   double comm_size = parse_double(action[2]);
376   double comp_size = parse_double(action[3]);
377   double clock = smpi_process()->simulated_elapsed();
378   int root         = (action[4]) ? std::stoi(action[4]) : 0;
379
380   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
381
382   int my_proc_id = Actor::self()->getPid();
383   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
384                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
385                                                     comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
386
387   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
388   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
390   smpi_execute_flops(comp_size);
391
392   TRACE_smpi_comm_out(my_proc_id);
393   log_timed_action (action, clock);
394 }
395
396 static void action_allReduce(const char *const *action) {
397   CHECK_ACTION_PARAMS(action, 2, 1)
398   double comm_size = parse_double(action[2]);
399   double comp_size = parse_double(action[3]);
400
401   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
402
403   double clock = smpi_process()->simulated_elapsed();
404   int my_proc_id = Actor::self()->getPid();
405   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
406                                                                               MPI_CURRENT_TYPE->encode(), ""));
407
408   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
409   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
410   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
411   smpi_execute_flops(comp_size);
412
413   TRACE_smpi_comm_out(my_proc_id);
414   log_timed_action (action, clock);
415 }
416
417 static void action_allToAll(const char *const *action) {
418   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
419   double clock = smpi_process()->simulated_elapsed();
420   int comm_size = MPI_COMM_WORLD->size();
421   int send_size = parse_double(action[2]);
422   int recv_size = parse_double(action[3]);
423   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
424   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
425
426   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
427   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
428
429   int my_proc_id = Actor::self()->getPid();
430   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
431                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
432                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
433
434   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_gather(const char *const *action) {
441   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
442         0 gather 68 68 0 0 0
443       where:
444         1) 68 is the sendcounts
445         2) 68 is the recvcounts
446         3) 0 is the root node
447         4) 0 is the send datatype id, see decode_datatype()
448         5) 0 is the recv datatype id, see decode_datatype()
449   */
450   CHECK_ACTION_PARAMS(action, 2, 3)
451   double clock = smpi_process()->simulated_elapsed();
452   int comm_size = MPI_COMM_WORLD->size();
453   int send_size = parse_double(action[2]);
454   int recv_size = parse_double(action[3]);
455   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
456   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
457
458   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
459   void *recv = nullptr;
460   int root   = (action[4]) ? std::stoi(action[4]) : 0;
461   int rank = MPI_COMM_WORLD->rank();
462
463   if(rank==root)
464     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
465
466   TRACE_smpi_comm_in(rank, __FUNCTION__,
467                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
468                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
469
470   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
471
472   TRACE_smpi_comm_out(Actor::self()->getPid());
473   log_timed_action (action, clock);
474 }
475
476 static void action_scatter(const char* const* action)
477 {
478   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
479         0 gather 68 68 0 0 0
480       where:
481         1) 68 is the sendcounts
482         2) 68 is the recvcounts
483         3) 0 is the root node
484         4) 0 is the send datatype id, see decode_datatype()
485         5) 0 is the recv datatype id, see decode_datatype()
486   */
487   CHECK_ACTION_PARAMS(action, 2, 3)
488   double clock                   = smpi_process()->simulated_elapsed();
489   int comm_size                  = MPI_COMM_WORLD->size();
490   int send_size                  = parse_double(action[2]);
491   int recv_size                  = parse_double(action[3]);
492   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
493   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
494
495   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
496   void* recv = nullptr;
497   int root   = (action[4]) ? std::stoi(action[4]) : 0;
498   int rank = MPI_COMM_WORLD->rank();
499
500   if (rank == root)
501     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
502
503   TRACE_smpi_comm_in(rank, __FUNCTION__,
504                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
505                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
506
507   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
508
509   TRACE_smpi_comm_out(Actor::self()->getPid());
510   log_timed_action(action, clock);
511 }
512
513 static void action_gatherv(const char *const *action) {
514   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
515        0 gather 68 68 10 10 10 0 0 0
516      where:
517        1) 68 is the sendcount
518        2) 68 10 10 10 is the recvcounts
519        3) 0 is the root node
520        4) 0 is the send datatype id, see decode_datatype()
521        5) 0 is the recv datatype id, see decode_datatype()
522   */
523   double clock = smpi_process()->simulated_elapsed();
524   int comm_size = MPI_COMM_WORLD->size();
525   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
526   int send_size = parse_double(action[2]);
527   std::vector<int> disps(comm_size, 0);
528   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
529
530   MPI_Datatype MPI_CURRENT_TYPE =
531       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
532   MPI_Datatype MPI_CURRENT_TYPE2{
533       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
534
535   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
536   void *recv = nullptr;
537   for(int i=0;i<comm_size;i++) {
538     (*recvcounts)[i] = std::stoi(action[i + 3]);
539   }
540   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
541
542   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
543   int rank = MPI_COMM_WORLD->rank();
544
545   if(rank==root)
546     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
547
548   TRACE_smpi_comm_in(rank, __FUNCTION__,
549                      new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
550                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
551
552   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
553                  MPI_COMM_WORLD);
554
555   TRACE_smpi_comm_out(Actor::self()->getPid());
556   log_timed_action (action, clock);
557 }
558
559 static void action_scatterv(const char* const* action)
560 {
561   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
562        0 gather 68 10 10 10 68 0 0 0
563      where:
564        1) 68 10 10 10 is the sendcounts
565        2) 68 is the recvcount
566        3) 0 is the root node
567        4) 0 is the send datatype id, see decode_datatype()
568        5) 0 is the recv datatype id, see decode_datatype()
569   */
570   double clock  = smpi_process()->simulated_elapsed();
571   int comm_size = MPI_COMM_WORLD->size();
572   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
573   int recv_size = parse_double(action[2 + comm_size]);
574   std::vector<int> disps(comm_size, 0);
575   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
576
577   MPI_Datatype MPI_CURRENT_TYPE =
578       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
579   MPI_Datatype MPI_CURRENT_TYPE2{
580       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
581
582   void* send = nullptr;
583   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
584   for (int i = 0; i < comm_size; i++) {
585     (*sendcounts)[i] = std::stoi(action[i + 2]);
586   }
587   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
588
589   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
590   int rank = MPI_COMM_WORLD->rank();
591
592   if (rank == root)
593     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
594
595   TRACE_smpi_comm_in(rank, __FUNCTION__,
596                      new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
597                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
598
599   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
600                   MPI_COMM_WORLD);
601
602   TRACE_smpi_comm_out(Actor::self()->getPid());
603   log_timed_action(action, clock);
604 }
605
606 static void action_reducescatter(const char *const *action) {
607  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
608       0 reduceScatter 275427 275427 275427 204020 11346849 0
609     where:
610       1) The first four values after the name of the action declare the recvcounts array
611       2) The value 11346849 is the amount of instructions
612       3) The last value corresponds to the datatype, see decode_datatype().
613 */
614   double clock = smpi_process()->simulated_elapsed();
615   int comm_size = MPI_COMM_WORLD->size();
616   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
617   int comp_size = parse_double(action[2+comm_size]);
618   int my_proc_id                     = Actor::self()->getPid();
619   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
620   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
621
622   for(int i=0;i<comm_size;i++) {
623     recvcounts->push_back(std::stoi(action[i + 2]));
624   }
625   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
626
627   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
628                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
629                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
630                                                        MPI_CURRENT_TYPE->encode()));
631
632   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
633   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
634
635   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
636   smpi_execute_flops(comp_size);
637
638   TRACE_smpi_comm_out(my_proc_id);
639   log_timed_action (action, clock);
640 }
641
642 static void action_allgather(const char *const *action) {
643   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
644         0 allGather 275427 275427
645     where:
646         1) 275427 is the sendcount
647         2) 275427 is the recvcount
648         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
649   */
650   double clock = smpi_process()->simulated_elapsed();
651
652   CHECK_ACTION_PARAMS(action, 2, 2)
653   int sendcount = std::stoi(action[2]);
654   int recvcount = std::stoi(action[3]);
655
656   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
657   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
658
659   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
660   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
661
662   int my_proc_id = Actor::self()->getPid();
663
664   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
665                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
666                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
667
668   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
669
670   TRACE_smpi_comm_out(my_proc_id);
671   log_timed_action (action, clock);
672 }
673
674 static void action_allgatherv(const char *const *action) {
675   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
676         0 allGatherV 275427 275427 275427 275427 204020
677      where:
678         1) 275427 is the sendcount
679         2) The next four elements declare the recvcounts array
680         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
681   */
682   double clock = smpi_process()->simulated_elapsed();
683
684   int comm_size = MPI_COMM_WORLD->size();
685   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
686   int sendcount = std::stoi(action[2]);
687   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
688   std::vector<int> disps(comm_size, 0);
689
690   int datatype_index = 0, disp_index = 0;
691   if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
692     datatype_index = 3 + comm_size;
693     disp_index     = datatype_index + 1;
694   } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
695     datatype_index = -1;
696     disp_index     = 3 + comm_size;
697   } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
698     datatype_index = 3 + comm_size;
699   }
700
701   if (disp_index != 0) {
702     std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
703   }
704
705   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
706   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
707
708   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
709
710   for(int i=0;i<comm_size;i++) {
711     (*recvcounts)[i] = std::stoi(action[i + 3]);
712   }
713   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
714   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
715
716   int my_proc_id = Actor::self()->getPid();
717
718   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
719                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
720                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
721
722   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
723                     MPI_COMM_WORLD);
724
725   TRACE_smpi_comm_out(my_proc_id);
726   log_timed_action (action, clock);
727 }
728
729 static void action_allToAllv(const char *const *action) {
730   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
731         0 allToAllV 100 1 7 10 12 100 1 70 10 5
732      where:
733         1) 100 is the size of the send buffer *sizeof(int),
734         2) 1 7 10 12 is the sendcounts array
735         3) 100*sizeof(int) is the size of the receiver buffer
736         4)  1 70 10 5 is the recvcounts array
737   */
738   double clock = smpi_process()->simulated_elapsed();
739
740   int comm_size = MPI_COMM_WORLD->size();
741   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
742   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
743   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
744   std::vector<int> senddisps(comm_size, 0);
745   std::vector<int> recvdisps(comm_size, 0);
746
747   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
748                                       ? decode_datatype(action[4 + 2 * comm_size])
749                                       : MPI_DEFAULT_TYPE;
750   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
751                                      ? decode_datatype(action[5 + 2 * comm_size])
752                                      : MPI_DEFAULT_TYPE};
753
754   int send_buf_size=parse_double(action[2]);
755   int recv_buf_size=parse_double(action[3+comm_size]);
756   int my_proc_id = Actor::self()->getPid();
757   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
758   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
759
760   for(int i=0;i<comm_size;i++) {
761     (*sendcounts)[i] = std::stoi(action[3 + i]);
762     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
763   }
764   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
765   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
766
767   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
768                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
769                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
770
771   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
772                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
773
774   TRACE_smpi_comm_out(my_proc_id);
775   log_timed_action (action, clock);
776 }
777
778 }} // namespace simgrid::smpi
779
780 /** @brief Only initialize the replay, don't do it for real */
781 void smpi_replay_init(int* argc, char*** argv)
782 {
783   simgrid::smpi::Process::init(argc, argv);
784   smpi_process()->mark_as_initialized();
785   smpi_process()->set_replaying(true);
786
787   int my_proc_id = Actor::self()->getPid();
788   TRACE_smpi_init(my_proc_id);
789   TRACE_smpi_computing_init(my_proc_id);
790   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
791   TRACE_smpi_comm_out(my_proc_id);
792   xbt_replay_action_register("init",       simgrid::smpi::action_init);
793   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
794   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
795   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
796   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
797   xbt_replay_action_register("send",       simgrid::smpi::action_send);
798   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
799   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
800   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
801   xbt_replay_action_register("test",       simgrid::smpi::action_test);
802   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
803   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
804   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
805   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
806   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
807   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
808   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
809   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
810   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
811   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
812   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
813   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
814   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
815   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
816   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
817   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
818
819   //if we have a delayed start, sleep here.
820   if(*argc>2){
821     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
822     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
823     smpi_execute_flops(value);
824   } else {
825     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
826     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
827     smpi_execute_flops(0.0);
828   }
829 }
830
831 /** @brief actually run the replay after initialization */
832 void smpi_replay_main(int* argc, char*** argv)
833 {
834   simgrid::xbt::replay_runner(*argc, *argv);
835
836   /* and now, finalize everything */
837   /* One active process will stop. Decrease the counter*/
838   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
839   if (not get_reqq_self()->empty()) {
840     unsigned int count_requests=get_reqq_self()->size();
841     MPI_Request requests[count_requests];
842     MPI_Status status[count_requests];
843     unsigned int i=0;
844
845     for (auto const& req : *get_reqq_self()) {
846       requests[i] = req;
847       i++;
848     }
849     simgrid::smpi::Request::waitall(count_requests, requests, status);
850   }
851   delete get_reqq_self();
852   active_processes--;
853
854   if(active_processes==0){
855     /* Last process alive speaking: end the simulated timer */
856     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
857     smpi_free_replay_tmp_buffers();
858   }
859
860   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
861
862   smpi_process()->finalize();
863
864   TRACE_smpi_comm_out(Actor::self()->getPid());
865   TRACE_smpi_finalize(Actor::self()->getPid());
866 }
867
868 /** @brief chain a replay initialization and a replay start */
869 void smpi_replay_run(int* argc, char*** argv)
870 {
871   smpi_replay_init(argc, argv);
872   smpi_replay_main(argc, argv);
873 }