Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Datatypes: Make them all replayable
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <memory>
16 #include <numeric>
17 #include <unordered_map>
18 #include <vector>
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 static int communicator_size = 0;
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 static int sendbuffer_size = 0;
31 static char* sendbuffer    = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer    = nullptr;
34
35 class ReplayActionArg {
36   ReplayActionArg() {}
37 };
38
39 static void log_timed_action (const char *const *action, double clock){
40   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41     char *name = xbt_str_join_array(action, " ");
42     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
43     xbt_free(name);
44   }
45 }
46
47 static std::vector<MPI_Request>* get_reqq_self()
48 {
49   return reqq.at(Actor::self()->getPid());
50 }
51
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
53 {
54    reqq.insert({Actor::self()->getPid(), mpi_request});
55 }
56
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
59 {
60   if (not smpi_process()->replaying())
61     return xbt_malloc(size);
62   if (sendbuffer_size<size){
63     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64     sendbuffer_size=size;
65   }
66   return sendbuffer;
67 }
68
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71   if (not smpi_process()->replaying())
72     return xbt_malloc(size);
73   if (recvbuffer_size<size){
74     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75     recvbuffer_size=size;
76   }
77   return recvbuffer;
78 }
79
80 void smpi_free_tmp_buffer(void* buf){
81   if (not smpi_process()->replaying())
82     xbt_free(buf);
83 }
84
85 /* Helper function */
86 static double parse_double(const char *string)
87 {
88   char *endptr;
89   double value = strtod(string, &endptr);
90   if (*endptr != '\0')
91     THROWF(unknown_error, 0, "%s is not a double", string);
92   return value;
93 }
94
95
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
98 {
99   return simgrid::smpi::Datatype::decode(action);
100 }
101
102 const char* encode_datatype(MPI_Datatype datatype)
103 {
104   if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
105     return "-1";
106
107   return datatype->encode();
108 }
109
110 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
111     int i=0;\
112     while(action[i]!=nullptr)\
113      i++;\
114     if(i<mandatory+2)                                           \
115     THROWF(arg_error, 0, "%s replay failed.\n" \
116           "%d items were given on the line. First two should be process_id and action.  " \
117           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
118           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
119   }
120
121 namespace simgrid {
122 namespace smpi {
123
124 static void action_init(const char *const *action)
125 {
126   XBT_DEBUG("Initialize the counters");
127   CHECK_ACTION_PARAMS(action, 0, 1)
128   if(action[2])
129     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
130   else
131     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
132
133   /* start a simulated timer */
134   smpi_process()->simulated_start();
135   /*initialize the number of active processes */
136   active_processes = smpi_process_count();
137
138   set_reqq_self(new std::vector<MPI_Request>);
139 }
140
141 static void action_finalize(const char *const *action)
142 {
143   /* Nothing to do */
144 }
145
146 static void action_comm_size(const char *const *action)
147 {
148   communicator_size = parse_double(action[2]);
149   log_timed_action (action, smpi_process()->simulated_elapsed());
150 }
151
152 static void action_comm_split(const char *const *action)
153 {
154   log_timed_action (action, smpi_process()->simulated_elapsed());
155 }
156
157 static void action_comm_dup(const char *const *action)
158 {
159   log_timed_action (action, smpi_process()->simulated_elapsed());
160 }
161
162 static void action_compute(const char *const *action)
163 {
164   CHECK_ACTION_PARAMS(action, 1, 0)
165   double clock = smpi_process()->simulated_elapsed();
166   double flops= parse_double(action[2]);
167   int my_proc_id = Actor::self()->getPid();
168
169   TRACE_smpi_computing_in(my_proc_id, flops);
170   smpi_execute_flops(flops);
171   TRACE_smpi_computing_out(my_proc_id);
172
173   log_timed_action (action, clock);
174 }
175
176 static void action_send(const char *const *action)
177 {
178   CHECK_ACTION_PARAMS(action, 2, 1)
179   int to = atoi(action[2]);
180   double size=parse_double(action[3]);
181   double clock = smpi_process()->simulated_elapsed();
182
183   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
184
185   int my_proc_id = Actor::self()->getPid();
186   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
187
188   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
190   if (not TRACE_smpi_view_internals())
191     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
192
193   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
194
195   TRACE_smpi_comm_out(my_proc_id);
196
197   log_timed_action(action, clock);
198 }
199
200 static void action_Isend(const char *const *action)
201 {
202   CHECK_ACTION_PARAMS(action, 2, 1)
203   int to = atoi(action[2]);
204   double size=parse_double(action[3]);
205   double clock = smpi_process()->simulated_elapsed();
206
207   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
208
209   int my_proc_id = Actor::self()->getPid();
210   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
211   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
213   if (not TRACE_smpi_view_internals())
214     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
215
216   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
217
218   TRACE_smpi_comm_out(my_proc_id);
219
220   get_reqq_self()->push_back(request);
221
222   log_timed_action (action, clock);
223 }
224
225 static void action_recv(const char *const *action) {
226   CHECK_ACTION_PARAMS(action, 2, 1)
227   int from = atoi(action[2]);
228   double size=parse_double(action[3]);
229   double clock = smpi_process()->simulated_elapsed();
230   MPI_Status status;
231
232   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
233
234   int my_proc_id = Actor::self()->getPid();
235   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
236
237   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
238                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
239
240   //unknown size from the receiver point of view
241   if (size <= 0.0) {
242     Request::probe(from, 0, MPI_COMM_WORLD, &status);
243     size=status.count;
244   }
245
246   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
247
248   TRACE_smpi_comm_out(my_proc_id);
249   if (not TRACE_smpi_view_internals()) {
250     TRACE_smpi_recv(src_traced, my_proc_id, 0);
251   }
252
253   log_timed_action (action, clock);
254 }
255
256 static void action_Irecv(const char *const *action)
257 {
258   CHECK_ACTION_PARAMS(action, 2, 1)
259   int from = atoi(action[2]);
260   double size=parse_double(action[3]);
261   double clock = smpi_process()->simulated_elapsed();
262
263   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
264
265   int my_proc_id = Actor::self()->getPid();
266   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
267                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
268   MPI_Status status;
269   //unknow size from the receiver pov
270   if (size <= 0.0) {
271     Request::probe(from, 0, MPI_COMM_WORLD, &status);
272     size = status.count;
273   }
274
275   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
276
277   TRACE_smpi_comm_out(my_proc_id);
278   get_reqq_self()->push_back(request);
279
280   log_timed_action (action, clock);
281 }
282
283 static void action_test(const char* const* action)
284 {
285   CHECK_ACTION_PARAMS(action, 0, 0)
286   double clock = smpi_process()->simulated_elapsed();
287   MPI_Status status;
288
289   MPI_Request request = get_reqq_self()->back();
290   get_reqq_self()->pop_back();
291   //if request is null here, this may mean that a previous test has succeeded
292   //Different times in traced application and replayed version may lead to this
293   //In this case, ignore the extra calls.
294   if(request!=nullptr){
295     int my_proc_id = Actor::self()->getPid();
296     TRACE_smpi_testing_in(my_proc_id);
297
298     int flag = Request::test(&request, &status);
299
300     XBT_DEBUG("MPI_Test result: %d", flag);
301     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
302     get_reqq_self()->push_back(request);
303
304     TRACE_smpi_testing_out(my_proc_id);
305   }
306   log_timed_action (action, clock);
307 }
308
309 static void action_wait(const char *const *action){
310   CHECK_ACTION_PARAMS(action, 0, 0)
311   double clock = smpi_process()->simulated_elapsed();
312   MPI_Status status;
313
314   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
315       xbt_str_join_array(action," "));
316   MPI_Request request = get_reqq_self()->back();
317   get_reqq_self()->pop_back();
318
319   if (request==nullptr){
320     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
321     return;
322   }
323
324   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
325
326   MPI_Group group = request->comm()->group();
327   int src_traced = group->rank(request->src());
328   int dst_traced = group->rank(request->dst());
329   int is_wait_for_receive = (request->flags() & RECV);
330   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
331
332   Request::wait(&request, &status);
333
334   TRACE_smpi_comm_out(rank);
335   if (is_wait_for_receive)
336     TRACE_smpi_recv(src_traced, dst_traced, 0);
337   log_timed_action (action, clock);
338 }
339
340 static void action_waitall(const char *const *action){
341   CHECK_ACTION_PARAMS(action, 0, 0)
342   double clock = smpi_process()->simulated_elapsed();
343   const unsigned int count_requests = get_reqq_self()->size();
344
345   if (count_requests>0) {
346     MPI_Status status[count_requests];
347
348     int my_proc_id_traced = Actor::self()->getPid();
349     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
350                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
351     int recvs_snd[count_requests];
352     int recvs_rcv[count_requests];
353     for (unsigned int i = 0; i < count_requests; i++) {
354       const auto& req = (*get_reqq_self())[i];
355       if (req && (req->flags() & RECV)) {
356         recvs_snd[i] = req->src();
357         recvs_rcv[i] = req->dst();
358       } else
359         recvs_snd[i] = -100;
360    }
361    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
362
363    for (unsigned i = 0; i < count_requests; i++) {
364      if (recvs_snd[i]!=-100)
365        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
366    }
367    TRACE_smpi_comm_out(my_proc_id_traced);
368   }
369   log_timed_action (action, clock);
370 }
371
372 static void action_barrier(const char *const *action){
373   double clock = smpi_process()->simulated_elapsed();
374   int my_proc_id = Actor::self()->getPid();
375   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
376
377   Colls::barrier(MPI_COMM_WORLD);
378
379   TRACE_smpi_comm_out(my_proc_id);
380   log_timed_action (action, clock);
381 }
382
383 static void action_bcast(const char *const *action)
384 {
385   CHECK_ACTION_PARAMS(action, 1, 2)
386   double size = parse_double(action[2]);
387   double clock = smpi_process()->simulated_elapsed();
388   int root     = (action[3]) ? atoi(action[3]) : 0;
389   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
390   MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
391
392   int my_proc_id = Actor::self()->getPid();
393   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
395                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
396
397   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
398
399   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
400
401   TRACE_smpi_comm_out(my_proc_id);
402   log_timed_action (action, clock);
403 }
404
405 static void action_reduce(const char *const *action)
406 {
407   CHECK_ACTION_PARAMS(action, 2, 2)
408   double comm_size = parse_double(action[2]);
409   double comp_size = parse_double(action[3]);
410   double clock = smpi_process()->simulated_elapsed();
411   int root         = (action[4]) ? atoi(action[4]) : 0;
412
413   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
414
415   int my_proc_id = Actor::self()->getPid();
416   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
417                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
418                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
419
420   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
422   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
423   smpi_execute_flops(comp_size);
424
425   TRACE_smpi_comm_out(my_proc_id);
426   log_timed_action (action, clock);
427 }
428
429 static void action_allReduce(const char *const *action) {
430   CHECK_ACTION_PARAMS(action, 2, 1)
431   double comm_size = parse_double(action[2]);
432   double comp_size = parse_double(action[3]);
433
434   MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
435
436   double clock = smpi_process()->simulated_elapsed();
437   int my_proc_id = Actor::self()->getPid();
438   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
439                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
440
441   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
442   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
443   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
444   smpi_execute_flops(comp_size);
445
446   TRACE_smpi_comm_out(my_proc_id);
447   log_timed_action (action, clock);
448 }
449
450 static void action_allToAll(const char *const *action) {
451   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
452   double clock = smpi_process()->simulated_elapsed();
453   int comm_size = MPI_COMM_WORLD->size();
454   int send_size = parse_double(action[2]);
455   int recv_size = parse_double(action[3]);
456   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
457   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
458
459   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
460   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
461
462   int my_proc_id = Actor::self()->getPid();
463   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
464                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
465                                                     encode_datatype(MPI_CURRENT_TYPE),
466                                                     encode_datatype(MPI_CURRENT_TYPE2)));
467
468   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
469
470   TRACE_smpi_comm_out(my_proc_id);
471   log_timed_action (action, clock);
472 }
473
474 static void action_gather(const char *const *action) {
475   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
476         0 gather 68 68 0 0 0
477       where:
478         1) 68 is the sendcounts
479         2) 68 is the recvcounts
480         3) 0 is the root node
481         4) 0 is the send datatype id, see decode_datatype()
482         5) 0 is the recv datatype id, see decode_datatype()
483   */
484   CHECK_ACTION_PARAMS(action, 2, 3)
485   double clock = smpi_process()->simulated_elapsed();
486   int comm_size = MPI_COMM_WORLD->size();
487   int send_size = parse_double(action[2]);
488   int recv_size = parse_double(action[3]);
489   MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
490   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
491
492   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
493   void *recv = nullptr;
494   int root   = (action[4]) ? atoi(action[4]) : 0;
495   int rank = MPI_COMM_WORLD->rank();
496
497   if(rank==root)
498     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
499
500   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
501                                                                         encode_datatype(MPI_CURRENT_TYPE),
502                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
503
504   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
505
506   TRACE_smpi_comm_out(Actor::self()->getPid());
507   log_timed_action (action, clock);
508 }
509
510 static void action_scatter(const char* const* action)
511 {
512   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
513         0 gather 68 68 0 0 0
514       where:
515         1) 68 is the sendcounts
516         2) 68 is the recvcounts
517         3) 0 is the root node
518         4) 0 is the send datatype id, see decode_datatype()
519         5) 0 is the recv datatype id, see decode_datatype()
520   */
521   CHECK_ACTION_PARAMS(action, 2, 3)
522   double clock                   = smpi_process()->simulated_elapsed();
523   int comm_size                  = MPI_COMM_WORLD->size();
524   int send_size                  = parse_double(action[2]);
525   int recv_size                  = parse_double(action[3]);
526   MPI_Datatype MPI_CURRENT_TYPE  = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
527   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
528
529   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
530   void* recv = nullptr;
531   int root   = (action[4]) ? atoi(action[4]) : 0;
532   int rank = MPI_COMM_WORLD->rank();
533
534   if (rank == root)
535     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
536
537   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
538                                                                         encode_datatype(MPI_CURRENT_TYPE),
539                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
540
541   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
542
543   TRACE_smpi_comm_out(Actor::self()->getPid());
544   log_timed_action(action, clock);
545 }
546
547 static void action_gatherv(const char *const *action) {
548   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
549        0 gather 68 68 10 10 10 0 0 0
550      where:
551        1) 68 is the sendcount
552        2) 68 10 10 10 is the recvcounts
553        3) 0 is the root node
554        4) 0 is the send datatype id, see decode_datatype()
555        5) 0 is the recv datatype id, see decode_datatype()
556   */
557   double clock = smpi_process()->simulated_elapsed();
558   int comm_size = MPI_COMM_WORLD->size();
559   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
560   int send_size = parse_double(action[2]);
561   std::vector<int> disps(comm_size, 0);
562   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
563
564   MPI_Datatype MPI_CURRENT_TYPE =
565       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
566   MPI_Datatype MPI_CURRENT_TYPE2{
567       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
568
569   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
570   void *recv = nullptr;
571   for(int i=0;i<comm_size;i++) {
572     (*recvcounts)[i] = atoi(action[i + 3]);
573   }
574   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
575
576   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
577   int rank = MPI_COMM_WORLD->rank();
578
579   if(rank==root)
580     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
581
582   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
583                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
584                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
585
586   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
587                  MPI_COMM_WORLD);
588
589   TRACE_smpi_comm_out(Actor::self()->getPid());
590   log_timed_action (action, clock);
591 }
592
593 static void action_scatterv(const char* const* action)
594 {
595   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
596        0 gather 68 10 10 10 68 0 0 0
597      where:
598        1) 68 10 10 10 is the sendcounts
599        2) 68 is the recvcount
600        3) 0 is the root node
601        4) 0 is the send datatype id, see decode_datatype()
602        5) 0 is the recv datatype id, see decode_datatype()
603   */
604   double clock  = smpi_process()->simulated_elapsed();
605   int comm_size = MPI_COMM_WORLD->size();
606   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
607   int recv_size = parse_double(action[2 + comm_size]);
608   std::vector<int> disps(comm_size, 0);
609   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
610
611   MPI_Datatype MPI_CURRENT_TYPE =
612       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
613   MPI_Datatype MPI_CURRENT_TYPE2{
614       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
615
616   void* send = nullptr;
617   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
618   for (int i = 0; i < comm_size; i++) {
619     (*sendcounts)[i] = atoi(action[i + 2]);
620   }
621   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
622
623   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
624   int rank = MPI_COMM_WORLD->rank();
625
626   if (rank == root)
627     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
628
629   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
630                                                                            nullptr, encode_datatype(MPI_CURRENT_TYPE),
631                                                                            encode_datatype(MPI_CURRENT_TYPE2)));
632
633   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
634                   MPI_COMM_WORLD);
635
636   TRACE_smpi_comm_out(Actor::self()->getPid());
637   log_timed_action(action, clock);
638 }
639
640 static void action_reducescatter(const char *const *action) {
641  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
642       0 reduceScatter 275427 275427 275427 204020 11346849 0
643     where:
644       1) The first four values after the name of the action declare the recvcounts array
645       2) The value 11346849 is the amount of instructions
646       3) The last value corresponds to the datatype, see decode_datatype().
647 */
648   double clock = smpi_process()->simulated_elapsed();
649   int comm_size = MPI_COMM_WORLD->size();
650   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
651   int comp_size = parse_double(action[2+comm_size]);
652   int my_proc_id                     = Actor::self()->getPid();
653   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
654   MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
655
656   for(int i=0;i<comm_size;i++) {
657     recvcounts->push_back(atoi(action[i + 2]));
658   }
659   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
660
661   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
662                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
663                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
664                                                        encode_datatype(MPI_CURRENT_TYPE)));
665
666   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
667   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
668
669   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
670   smpi_execute_flops(comp_size);
671
672   TRACE_smpi_comm_out(my_proc_id);
673   log_timed_action (action, clock);
674 }
675
676 static void action_allgather(const char *const *action) {
677   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
678         0 allGather 275427 275427
679     where:
680         1) 275427 is the sendcount
681         2) 275427 is the recvcount
682         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
683   */
684   double clock = smpi_process()->simulated_elapsed();
685
686   CHECK_ACTION_PARAMS(action, 2, 2)
687   int sendcount=atoi(action[2]);
688   int recvcount=atoi(action[3]);
689
690   MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
691   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
692
693   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
694   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
695
696   int my_proc_id = Actor::self()->getPid();
697
698   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
699                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
700                                                     encode_datatype(MPI_CURRENT_TYPE),
701                                                     encode_datatype(MPI_CURRENT_TYPE2)));
702
703   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
704
705   TRACE_smpi_comm_out(my_proc_id);
706   log_timed_action (action, clock);
707 }
708
709 static void action_allgatherv(const char *const *action) {
710   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
711         0 allGatherV 275427 275427 275427 275427 204020
712      where:
713         1) 275427 is the sendcount
714         2) The next four elements declare the recvcounts array
715         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
716   */
717   double clock = smpi_process()->simulated_elapsed();
718
719   int comm_size = MPI_COMM_WORLD->size();
720   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
721   int sendcount=atoi(action[2]);
722   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
723   std::vector<int> disps(comm_size, 0);
724
725   MPI_Datatype MPI_CURRENT_TYPE =
726       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
727   MPI_Datatype MPI_CURRENT_TYPE2{
728       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
729
730   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
731
732   for(int i=0;i<comm_size;i++) {
733     (*recvcounts)[i] = atoi(action[i + 3]);
734   }
735   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
736   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
737
738   int my_proc_id = Actor::self()->getPid();
739
740   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
741                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
742                                                        encode_datatype(MPI_CURRENT_TYPE),
743                                                        encode_datatype(MPI_CURRENT_TYPE2)));
744
745   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
746                     MPI_COMM_WORLD);
747
748   TRACE_smpi_comm_out(my_proc_id);
749   log_timed_action (action, clock);
750 }
751
752 static void action_allToAllv(const char *const *action) {
753   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
754         0 allToAllV 100 1 7 10 12 100 1 70 10 5
755      where:
756         1) 100 is the size of the send buffer *sizeof(int),
757         2) 1 7 10 12 is the sendcounts array
758         3) 100*sizeof(int) is the size of the receiver buffer
759         4)  1 70 10 5 is the recvcounts array
760   */
761   double clock = smpi_process()->simulated_elapsed();
762
763   int comm_size = MPI_COMM_WORLD->size();
764   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
765   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
766   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
767   std::vector<int> senddisps(comm_size, 0);
768   std::vector<int> recvdisps(comm_size, 0);
769
770   MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
771                                       ? decode_datatype(action[4 + 2 * comm_size])
772                                       : MPI_DEFAULT_TYPE;
773   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
774                                      ? decode_datatype(action[5 + 2 * comm_size])
775                                      : MPI_DEFAULT_TYPE};
776
777   int send_buf_size=parse_double(action[2]);
778   int recv_buf_size=parse_double(action[3+comm_size]);
779   int my_proc_id = Actor::self()->getPid();
780   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
781   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
782
783   for(int i=0;i<comm_size;i++) {
784     (*sendcounts)[i] = atoi(action[3 + i]);
785     (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
786   }
787   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
788   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
789
790   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
791                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
792                                                        encode_datatype(MPI_CURRENT_TYPE),
793                                                        encode_datatype(MPI_CURRENT_TYPE2)));
794
795   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
796                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
797
798   TRACE_smpi_comm_out(my_proc_id);
799   log_timed_action (action, clock);
800 }
801
802 }} // namespace simgrid::smpi
803
804 /** @brief Only initialize the replay, don't do it for real */
805 void smpi_replay_init(int* argc, char*** argv)
806 {
807   simgrid::smpi::Process::init(argc, argv);
808   smpi_process()->mark_as_initialized();
809   smpi_process()->set_replaying(true);
810
811   int my_proc_id = Actor::self()->getPid();
812   TRACE_smpi_init(my_proc_id);
813   TRACE_smpi_computing_init(my_proc_id);
814   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
815   TRACE_smpi_comm_out(my_proc_id);
816   xbt_replay_action_register("init",       simgrid::smpi::action_init);
817   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
818   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
819   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
820   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
821   xbt_replay_action_register("send",       simgrid::smpi::action_send);
822   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
823   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
824   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
825   xbt_replay_action_register("test",       simgrid::smpi::action_test);
826   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
827   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
828   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
829   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
830   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
831   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
832   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
833   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
834   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
835   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
836   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
837   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
838   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
839   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
840   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
841   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
842
843   //if we have a delayed start, sleep here.
844   if(*argc>2){
845     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
846     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
847     smpi_execute_flops(value);
848   } else {
849     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
850     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
851     smpi_execute_flops(0.0);
852   }
853 }
854
855 /** @brief actually run the replay after initialization */
856 void smpi_replay_main(int* argc, char*** argv)
857 {
858   simgrid::xbt::replay_runner(*argc, *argv);
859
860   /* and now, finalize everything */
861   /* One active process will stop. Decrease the counter*/
862   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
863   if (not get_reqq_self()->empty()) {
864     unsigned int count_requests=get_reqq_self()->size();
865     MPI_Request requests[count_requests];
866     MPI_Status status[count_requests];
867     unsigned int i=0;
868
869     for (auto const& req : *get_reqq_self()) {
870       requests[i] = req;
871       i++;
872     }
873     simgrid::smpi::Request::waitall(count_requests, requests, status);
874   }
875   delete get_reqq_self();
876   active_processes--;
877
878   if(active_processes==0){
879     /* Last process alive speaking: end the simulated timer */
880     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
881     xbt_free(sendbuffer);
882     xbt_free(recvbuffer);
883   }
884
885   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
886
887   smpi_process()->finalize();
888
889   TRACE_smpi_comm_out(Actor::self()->getPid());
890   TRACE_smpi_finalize(Actor::self()->getPid());
891 }
892
893 /** @brief chain a replay initialization and a replay start */
894 void smpi_replay_run(int* argc, char*** argv)
895 {
896   smpi_replay_init(argc, argv);
897   smpi_replay_main(argc, argv);
898 }