Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[MSG/Replay] Delete old msg/replay code
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30
31 class ReplayActionArg {
32   ReplayActionArg() {}
33 };
34
35 static void log_timed_action (const char *const *action, double clock){
36   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
37     char *name = xbt_str_join_array(action, " ");
38     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
39     xbt_free(name);
40   }
41 }
42
43 static std::vector<MPI_Request>* get_reqq_self()
44 {
45   return reqq.at(Actor::self()->getPid());
46 }
47
48 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 {
50    reqq.insert({Actor::self()->getPid(), mpi_request});
51 }
52
53 /* Helper function */
54 static double parse_double(const char *string)
55 {
56   char *endptr;
57   double value = strtod(string, &endptr);
58   if (*endptr != '\0')
59     THROWF(unknown_error, 0, "%s is not a double", string);
60   return value;
61 }
62
63
64 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
65 static MPI_Datatype decode_datatype(const char *const action)
66 {
67   return simgrid::smpi::Datatype::decode(action);
68 }
69
70 const char* encode_datatype(MPI_Datatype datatype)
71 {
72   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
73     return "-1";
74
75   return datatype->encode();
76 }
77
78 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
79     int i=0;\
80     while(action[i]!=nullptr)\
81      i++;\
82     if(i<mandatory+2)                                           \
83     THROWF(arg_error, 0, "%s replay failed.\n" \
84           "%d items were given on the line. First two should be process_id and action.  " \
85           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
86           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
87   }
88
89 namespace simgrid {
90 namespace smpi {
91
92 static void action_init(const char *const *action)
93 {
94   XBT_DEBUG("Initialize the counters");
95   CHECK_ACTION_PARAMS(action, 0, 1)
96   if(action[2])
97     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
98   else
99     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
100
101   /* start a simulated timer */
102   smpi_process()->simulated_start();
103   /*initialize the number of active processes */
104   active_processes = smpi_process_count();
105
106   set_reqq_self(new std::vector<MPI_Request>);
107 }
108
109 static void action_finalize(const char *const *action)
110 {
111   /* Nothing to do */
112 }
113
114 static void action_comm_size(const char *const *action)
115 {
116   communicator_size = parse_double(action[2]);
117   log_timed_action (action, smpi_process()->simulated_elapsed());
118 }
119
120 static void action_comm_split(const char *const *action)
121 {
122   log_timed_action (action, smpi_process()->simulated_elapsed());
123 }
124
125 static void action_comm_dup(const char *const *action)
126 {
127   log_timed_action (action, smpi_process()->simulated_elapsed());
128 }
129
130 static void action_compute(const char *const *action)
131 {
132   CHECK_ACTION_PARAMS(action, 1, 0)
133   double clock = smpi_process()->simulated_elapsed();
134   double flops= parse_double(action[2]);
135   int my_proc_id = Actor::self()->getPid();
136
137   TRACE_smpi_computing_in(my_proc_id, flops);
138   smpi_execute_flops(flops);
139   TRACE_smpi_computing_out(my_proc_id);
140
141   log_timed_action (action, clock);
142 }
143
144 static void action_send(const char *const *action)
145 {
146   CHECK_ACTION_PARAMS(action, 2, 1)
147   int to       = std::stoi(action[2]);
148   double size=parse_double(action[3]);
149   double clock = smpi_process()->simulated_elapsed();
150
151   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
152
153   int my_proc_id = Actor::self()->getPid();
154   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
155
156   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
157                      new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
158   if (not TRACE_smpi_view_internals())
159     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
160
161   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
162
163   TRACE_smpi_comm_out(my_proc_id);
164
165   log_timed_action(action, clock);
166 }
167
168 static void action_Isend(const char *const *action)
169 {
170   CHECK_ACTION_PARAMS(action, 2, 1)
171   int to       = std::stoi(action[2]);
172   double size=parse_double(action[3]);
173   double clock = smpi_process()->simulated_elapsed();
174
175   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
176
177   int my_proc_id = Actor::self()->getPid();
178   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
179   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
180                      new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
181   if (not TRACE_smpi_view_internals())
182     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
183
184   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
185
186   TRACE_smpi_comm_out(my_proc_id);
187
188   get_reqq_self()->push_back(request);
189
190   log_timed_action (action, clock);
191 }
192
193 static void action_recv(const char *const *action) {
194   CHECK_ACTION_PARAMS(action, 2, 1)
195   int from     = std::stoi(action[2]);
196   double size=parse_double(action[3]);
197   double clock = smpi_process()->simulated_elapsed();
198   MPI_Status status;
199
200   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
201
202   int my_proc_id = Actor::self()->getPid();
203   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
204
205   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
206                      new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
207
208   //unknown size from the receiver point of view
209   if (size <= 0.0) {
210     Request::probe(from, 0, MPI_COMM_WORLD, &status);
211     size=status.count;
212   }
213
214   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
215
216   TRACE_smpi_comm_out(my_proc_id);
217   if (not TRACE_smpi_view_internals()) {
218     TRACE_smpi_recv(src_traced, my_proc_id, 0);
219   }
220
221   log_timed_action (action, clock);
222 }
223
224 static void action_Irecv(const char *const *action)
225 {
226   CHECK_ACTION_PARAMS(action, 2, 1)
227   int from     = std::stoi(action[2]);
228   double size=parse_double(action[3]);
229   double clock = smpi_process()->simulated_elapsed();
230
231   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
232
233   int my_proc_id = Actor::self()->getPid();
234   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
235                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
236   MPI_Status status;
237   //unknow size from the receiver pov
238   if (size <= 0.0) {
239     Request::probe(from, 0, MPI_COMM_WORLD, &status);
240     size = status.count;
241   }
242
243   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
244
245   TRACE_smpi_comm_out(my_proc_id);
246   get_reqq_self()->push_back(request);
247
248   log_timed_action (action, clock);
249 }
250
251 static void action_test(const char* const* action)
252 {
253   CHECK_ACTION_PARAMS(action, 0, 0)
254   double clock = smpi_process()->simulated_elapsed();
255   MPI_Status status;
256
257   MPI_Request request = get_reqq_self()->back();
258   get_reqq_self()->pop_back();
259   //if request is null here, this may mean that a previous test has succeeded
260   //Different times in traced application and replayed version may lead to this
261   //In this case, ignore the extra calls.
262   if(request!=nullptr){
263     int my_proc_id = Actor::self()->getPid();
264     TRACE_smpi_testing_in(my_proc_id);
265
266     int flag = Request::test(&request, &status);
267
268     XBT_DEBUG("MPI_Test result: %d", flag);
269     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
270     get_reqq_self()->push_back(request);
271
272     TRACE_smpi_testing_out(my_proc_id);
273   }
274   log_timed_action (action, clock);
275 }
276
277 static void action_wait(const char *const *action){
278   CHECK_ACTION_PARAMS(action, 0, 0)
279   double clock = smpi_process()->simulated_elapsed();
280   MPI_Status status;
281
282   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
283       xbt_str_join_array(action," "));
284   MPI_Request request = get_reqq_self()->back();
285   get_reqq_self()->pop_back();
286
287   if (request==nullptr){
288     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
289     return;
290   }
291
292   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
293
294   MPI_Group group = request->comm()->group();
295   int src_traced = group->rank(request->src());
296   int dst_traced = group->rank(request->dst());
297   int is_wait_for_receive = (request->flags() & RECV);
298   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
299
300   Request::wait(&request, &status);
301
302   TRACE_smpi_comm_out(rank);
303   if (is_wait_for_receive)
304     TRACE_smpi_recv(src_traced, dst_traced, 0);
305   log_timed_action (action, clock);
306 }
307
308 static void action_waitall(const char *const *action){
309   CHECK_ACTION_PARAMS(action, 0, 0)
310   double clock = smpi_process()->simulated_elapsed();
311   const unsigned int count_requests = get_reqq_self()->size();
312
313   if (count_requests>0) {
314     MPI_Status status[count_requests];
315
316     int my_proc_id_traced = Actor::self()->getPid();
317     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
318                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
319     int recvs_snd[count_requests];
320     int recvs_rcv[count_requests];
321     for (unsigned int i = 0; i < count_requests; i++) {
322       const auto& req = (*get_reqq_self())[i];
323       if (req && (req->flags() & RECV)) {
324         recvs_snd[i] = req->src();
325         recvs_rcv[i] = req->dst();
326       } else
327         recvs_snd[i] = -100;
328    }
329    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
330
331    for (unsigned i = 0; i < count_requests; i++) {
332      if (recvs_snd[i]!=-100)
333        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
334    }
335    TRACE_smpi_comm_out(my_proc_id_traced);
336   }
337   log_timed_action (action, clock);
338 }
339
340 static void action_barrier(const char *const *action){
341   double clock = smpi_process()->simulated_elapsed();
342   int my_proc_id = Actor::self()->getPid();
343   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
344
345   Colls::barrier(MPI_COMM_WORLD);
346
347   TRACE_smpi_comm_out(my_proc_id);
348   log_timed_action (action, clock);
349 }
350
351 static void action_bcast(const char *const *action)
352 {
353   CHECK_ACTION_PARAMS(action, 1, 2)
354   double size = parse_double(action[2]);
355   double clock = smpi_process()->simulated_elapsed();
356   int root     = (action[3]) ? std::stoi(action[3]) : 0;
357   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
358   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
359
360   int my_proc_id = Actor::self()->getPid();
361   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
362                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
363                                                     -1, MPI_CURRENT_TYPE->encode(), ""));
364
365   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
366
367   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
368
369   TRACE_smpi_comm_out(my_proc_id);
370   log_timed_action (action, clock);
371 }
372
373 static void action_reduce(const char *const *action)
374 {
375   CHECK_ACTION_PARAMS(action, 2, 2)
376   double comm_size = parse_double(action[2]);
377   double comp_size = parse_double(action[3]);
378   double clock = smpi_process()->simulated_elapsed();
379   int root         = (action[4]) ? std::stoi(action[4]) : 0;
380
381   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
382
383   int my_proc_id = Actor::self()->getPid();
384   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
385                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
386                                                     comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
387
388   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
389   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
390   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
391   smpi_execute_flops(comp_size);
392
393   TRACE_smpi_comm_out(my_proc_id);
394   log_timed_action (action, clock);
395 }
396
397 static void action_allReduce(const char *const *action) {
398   CHECK_ACTION_PARAMS(action, 2, 1)
399   double comm_size = parse_double(action[2]);
400   double comp_size = parse_double(action[3]);
401
402   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
403
404   double clock = smpi_process()->simulated_elapsed();
405   int my_proc_id = Actor::self()->getPid();
406   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
407                                                                               MPI_CURRENT_TYPE->encode(), ""));
408
409   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
410   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
412   smpi_execute_flops(comp_size);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_allToAll(const char *const *action) {
419   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
420   double clock = smpi_process()->simulated_elapsed();
421   int comm_size = MPI_COMM_WORLD->size();
422   int send_size = parse_double(action[2]);
423   int recv_size = parse_double(action[3]);
424   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
426
427   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
428   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
429
430   int my_proc_id = Actor::self()->getPid();
431   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
433                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
434
435   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
436
437   TRACE_smpi_comm_out(my_proc_id);
438   log_timed_action (action, clock);
439 }
440
441 static void action_gather(const char *const *action) {
442   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
443         0 gather 68 68 0 0 0
444       where:
445         1) 68 is the sendcounts
446         2) 68 is the recvcounts
447         3) 0 is the root node
448         4) 0 is the send datatype id, see decode_datatype()
449         5) 0 is the recv datatype id, see decode_datatype()
450   */
451   CHECK_ACTION_PARAMS(action, 2, 3)
452   double clock = smpi_process()->simulated_elapsed();
453   int comm_size = MPI_COMM_WORLD->size();
454   int send_size = parse_double(action[2]);
455   int recv_size = parse_double(action[3]);
456   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
457   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
458
459   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
460   void *recv = nullptr;
461   int root   = (action[4]) ? std::stoi(action[4]) : 0;
462   int rank = MPI_COMM_WORLD->rank();
463
464   if(rank==root)
465     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
466
467   TRACE_smpi_comm_in(rank, __FUNCTION__,
468                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
469                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
470
471   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
472
473   TRACE_smpi_comm_out(Actor::self()->getPid());
474   log_timed_action (action, clock);
475 }
476
477 static void action_scatter(const char* const* action)
478 {
479   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
480         0 gather 68 68 0 0 0
481       where:
482         1) 68 is the sendcounts
483         2) 68 is the recvcounts
484         3) 0 is the root node
485         4) 0 is the send datatype id, see decode_datatype()
486         5) 0 is the recv datatype id, see decode_datatype()
487   */
488   CHECK_ACTION_PARAMS(action, 2, 3)
489   double clock                   = smpi_process()->simulated_elapsed();
490   int comm_size                  = MPI_COMM_WORLD->size();
491   int send_size                  = parse_double(action[2]);
492   int recv_size                  = parse_double(action[3]);
493   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
494   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
495
496   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
497   void* recv = nullptr;
498   int root   = (action[4]) ? std::stoi(action[4]) : 0;
499   int rank = MPI_COMM_WORLD->rank();
500
501   if (rank == root)
502     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
503
504   TRACE_smpi_comm_in(rank, __FUNCTION__,
505                      new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
506                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
507
508   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
509
510   TRACE_smpi_comm_out(Actor::self()->getPid());
511   log_timed_action(action, clock);
512 }
513
514 static void action_gatherv(const char *const *action) {
515   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
516        0 gather 68 68 10 10 10 0 0 0
517      where:
518        1) 68 is the sendcount
519        2) 68 10 10 10 is the recvcounts
520        3) 0 is the root node
521        4) 0 is the send datatype id, see decode_datatype()
522        5) 0 is the recv datatype id, see decode_datatype()
523   */
524   double clock = smpi_process()->simulated_elapsed();
525   int comm_size = MPI_COMM_WORLD->size();
526   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
527   int send_size = parse_double(action[2]);
528   std::vector<int> disps(comm_size, 0);
529   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
530
531   MPI_Datatype MPI_CURRENT_TYPE =
532       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
533   MPI_Datatype MPI_CURRENT_TYPE2{
534       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
535
536   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
537   void *recv = nullptr;
538   for(int i=0;i<comm_size;i++) {
539     (*recvcounts)[i] = std::stoi(action[i + 3]);
540   }
541   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
542
543   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
544   int rank = MPI_COMM_WORLD->rank();
545
546   if(rank==root)
547     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
548
549   TRACE_smpi_comm_in(rank, __FUNCTION__,
550                      new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
551                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
552
553   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
554                  MPI_COMM_WORLD);
555
556   TRACE_smpi_comm_out(Actor::self()->getPid());
557   log_timed_action (action, clock);
558 }
559
560 static void action_scatterv(const char* const* action)
561 {
562   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
563        0 gather 68 10 10 10 68 0 0 0
564      where:
565        1) 68 10 10 10 is the sendcounts
566        2) 68 is the recvcount
567        3) 0 is the root node
568        4) 0 is the send datatype id, see decode_datatype()
569        5) 0 is the recv datatype id, see decode_datatype()
570   */
571   double clock  = smpi_process()->simulated_elapsed();
572   int comm_size = MPI_COMM_WORLD->size();
573   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
574   int recv_size = parse_double(action[2 + comm_size]);
575   std::vector<int> disps(comm_size, 0);
576   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
577
578   MPI_Datatype MPI_CURRENT_TYPE =
579       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
580   MPI_Datatype MPI_CURRENT_TYPE2{
581       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
582
583   void* send = nullptr;
584   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
585   for (int i = 0; i < comm_size; i++) {
586     (*sendcounts)[i] = std::stoi(action[i + 2]);
587   }
588   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
589
590   int root = (action[3 + comm_size]) ? std::stoi(action[3 + comm_size]) : 0;
591   int rank = MPI_COMM_WORLD->rank();
592
593   if (rank == root)
594     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
595
596   TRACE_smpi_comm_in(rank, __FUNCTION__,
597                      new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
598                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
599
600   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
601                   MPI_COMM_WORLD);
602
603   TRACE_smpi_comm_out(Actor::self()->getPid());
604   log_timed_action(action, clock);
605 }
606
607 static void action_reducescatter(const char *const *action) {
608  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
609       0 reduceScatter 275427 275427 275427 204020 11346849 0
610     where:
611       1) The first four values after the name of the action declare the recvcounts array
612       2) The value 11346849 is the amount of instructions
613       3) The last value corresponds to the datatype, see decode_datatype().
614 */
615   double clock = smpi_process()->simulated_elapsed();
616   int comm_size = MPI_COMM_WORLD->size();
617   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
618   int comp_size = parse_double(action[2+comm_size]);
619   int my_proc_id                     = Actor::self()->getPid();
620   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
621   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
622
623   for(int i=0;i<comm_size;i++) {
624     recvcounts->push_back(std::stoi(action[i + 2]));
625   }
626   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
627
628   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
629                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
630                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
631                                                        MPI_CURRENT_TYPE->encode()));
632
633   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
634   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
635
636   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
637   smpi_execute_flops(comp_size);
638
639   TRACE_smpi_comm_out(my_proc_id);
640   log_timed_action (action, clock);
641 }
642
643 static void action_allgather(const char *const *action) {
644   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
645         0 allGather 275427 275427
646     where:
647         1) 275427 is the sendcount
648         2) 275427 is the recvcount
649         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
650   */
651   double clock = smpi_process()->simulated_elapsed();
652
653   CHECK_ACTION_PARAMS(action, 2, 2)
654   int sendcount = std::stoi(action[2]);
655   int recvcount = std::stoi(action[3]);
656
657   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
658   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
659
660   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
661   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
662
663   int my_proc_id = Actor::self()->getPid();
664
665   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
666                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
667                                                     MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
668
669   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
670
671   TRACE_smpi_comm_out(my_proc_id);
672   log_timed_action (action, clock);
673 }
674
675 static void action_allgatherv(const char *const *action) {
676   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
677         0 allGatherV 275427 275427 275427 275427 204020
678      where:
679         1) 275427 is the sendcount
680         2) The next four elements declare the recvcounts array
681         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
682   */
683   double clock = smpi_process()->simulated_elapsed();
684
685   int comm_size = MPI_COMM_WORLD->size();
686   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
687   int sendcount = std::stoi(action[2]);
688   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
689   std::vector<int> disps(comm_size, 0);
690
691   int datatype_index = 0, disp_index = 0;
692   if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
693     datatype_index = 3 + comm_size;
694     disp_index     = datatype_index + 1;
695   } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
696     datatype_index = -1;
697     disp_index     = 3 + comm_size;
698   } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
699     datatype_index = 3 + comm_size;
700   }
701
702   if (disp_index != 0) {
703     std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
704   }
705
706   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
707   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
708
709   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
710
711   for(int i=0;i<comm_size;i++) {
712     (*recvcounts)[i] = std::stoi(action[i + 3]);
713   }
714   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
715   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
716
717   int my_proc_id = Actor::self()->getPid();
718
719   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
720                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
721                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
722
723   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
724                     MPI_COMM_WORLD);
725
726   TRACE_smpi_comm_out(my_proc_id);
727   log_timed_action (action, clock);
728 }
729
730 static void action_allToAllv(const char *const *action) {
731   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
732         0 allToAllV 100 1 7 10 12 100 1 70 10 5
733      where:
734         1) 100 is the size of the send buffer *sizeof(int),
735         2) 1 7 10 12 is the sendcounts array
736         3) 100*sizeof(int) is the size of the receiver buffer
737         4)  1 70 10 5 is the recvcounts array
738   */
739   double clock = smpi_process()->simulated_elapsed();
740
741   int comm_size = MPI_COMM_WORLD->size();
742   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
743   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
744   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
745   std::vector<int> senddisps(comm_size, 0);
746   std::vector<int> recvdisps(comm_size, 0);
747
748   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
749                                       ? decode_datatype(action[4 + 2 * comm_size])
750                                       : MPI_DEFAULT_TYPE;
751   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
752                                      ? decode_datatype(action[5 + 2 * comm_size])
753                                      : MPI_DEFAULT_TYPE};
754
755   int send_buf_size=parse_double(action[2]);
756   int recv_buf_size=parse_double(action[3+comm_size]);
757   int my_proc_id = Actor::self()->getPid();
758   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
759   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
760
761   for(int i=0;i<comm_size;i++) {
762     (*sendcounts)[i] = std::stoi(action[3 + i]);
763     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
764   }
765   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
766   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
767
768   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
769                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
770                                                        MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
771
772   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
773                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
774
775   TRACE_smpi_comm_out(my_proc_id);
776   log_timed_action (action, clock);
777 }
778
779 }} // namespace simgrid::smpi
780
781 /** @brief Only initialize the replay, don't do it for real */
782 void smpi_replay_init(int* argc, char*** argv)
783 {
784   simgrid::smpi::Process::init(argc, argv);
785   smpi_process()->mark_as_initialized();
786   smpi_process()->set_replaying(true);
787
788   int my_proc_id = Actor::self()->getPid();
789   TRACE_smpi_init(my_proc_id);
790   TRACE_smpi_computing_init(my_proc_id);
791   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
792   TRACE_smpi_comm_out(my_proc_id);
793   xbt_replay_action_register("init",       simgrid::smpi::action_init);
794   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
795   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
796   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
797   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
798   xbt_replay_action_register("send",       simgrid::smpi::action_send);
799   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
800   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
801   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
802   xbt_replay_action_register("test",       simgrid::smpi::action_test);
803   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
804   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
805   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
806   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
807   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
808   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
809   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
810   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
811   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
812   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
813   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
814   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
815   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
816   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
817   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
818   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
819
820   //if we have a delayed start, sleep here.
821   if(*argc>2){
822     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
823     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
824     smpi_execute_flops(value);
825   } else {
826     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
827     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
828     smpi_execute_flops(0.0);
829   }
830 }
831
832 /** @brief actually run the replay after initialization */
833 void smpi_replay_main(int* argc, char*** argv)
834 {
835   simgrid::xbt::replay_runner(*argc, *argv);
836
837   /* and now, finalize everything */
838   /* One active process will stop. Decrease the counter*/
839   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
840   if (not get_reqq_self()->empty()) {
841     unsigned int count_requests=get_reqq_self()->size();
842     MPI_Request requests[count_requests];
843     MPI_Status status[count_requests];
844     unsigned int i=0;
845
846     for (auto const& req : *get_reqq_self()) {
847       requests[i] = req;
848       i++;
849     }
850     simgrid::smpi::Request::waitall(count_requests, requests, status);
851   }
852   delete get_reqq_self();
853   active_processes--;
854
855   if(active_processes==0){
856     /* Last process alive speaking: end the simulated timer */
857     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
858     smpi_free_replay_tmp_buffers();
859   }
860
861   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
862
863   smpi_process()->finalize();
864
865   TRACE_smpi_comm_out(Actor::self()->getPid());
866   TRACE_smpi_finalize(Actor::self()->getPid());
867 }
868
869 /** @brief chain a replay initialization and a replay start */
870 void smpi_replay_run(int* argc, char*** argv)
871 {
872   smpi_replay_init(argc, argv);
873   smpi_replay_main(argc, argv);
874 }