Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Cut k/m/Resource.[ch] to its own files
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 using simgrid::s4u::Actor;
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
23
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
27
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
30
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
35
36 static void log_timed_action (const char *const *action, double clock){
37   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38     char *name = xbt_str_join_array(action, " ");
39     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
40     xbt_free(name);
41   }
42 }
43
44 static std::vector<MPI_Request>* get_reqq_self()
45 {
46   return reqq.at(Actor::self()->getPid());
47 }
48
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
50 {
51    reqq.insert({Actor::self()->getPid(), mpi_request});
52 }
53
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
56 {
57   if (not smpi_process()->replaying())
58     return xbt_malloc(size);
59   if (sendbuffer_size<size){
60     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
61     sendbuffer_size=size;
62   }
63   return sendbuffer;
64 }
65
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68   if (not smpi_process()->replaying())
69     return xbt_malloc(size);
70   if (recvbuffer_size<size){
71     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
72     recvbuffer_size=size;
73   }
74   return recvbuffer;
75 }
76
77 void smpi_free_tmp_buffer(void* buf){
78   if (not smpi_process()->replaying())
79     xbt_free(buf);
80 }
81
82 /* Helper function */
83 static double parse_double(const char *string)
84 {
85   char *endptr;
86   double value = strtod(string, &endptr);
87   if (*endptr != '\0')
88     THROWF(unknown_error, 0, "%s is not a double", string);
89   return value;
90 }
91
92
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
95 {
96   switch(atoi(action)) {
97     case 0:
98       MPI_CURRENT_TYPE=MPI_DOUBLE;
99       break;
100     case 1:
101       MPI_CURRENT_TYPE=MPI_INT;
102       break;
103     case 2:
104       MPI_CURRENT_TYPE=MPI_CHAR;
105       break;
106     case 3:
107       MPI_CURRENT_TYPE=MPI_SHORT;
108       break;
109     case 4:
110       MPI_CURRENT_TYPE=MPI_LONG;
111       break;
112     case 5:
113       MPI_CURRENT_TYPE=MPI_FLOAT;
114       break;
115     case 6:
116       MPI_CURRENT_TYPE=MPI_BYTE;
117       break;
118     default:
119       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120       break;
121   }
122    return MPI_CURRENT_TYPE;
123 }
124
125 const char* encode_datatype(MPI_Datatype datatype)
126 {
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   // default - not implemented.
142   // do not warn here as we pass in this function even for other trace formats
143   return "-1";
144 }
145
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147     int i=0;\
148     while(action[i]!=nullptr)\
149      i++;\
150     if(i<mandatory+2)                                           \
151     THROWF(arg_error, 0, "%s replay failed.\n" \
152           "%d items were given on the line. First two should be process_id and action.  " \
153           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
155   }
156
157 namespace simgrid {
158 namespace smpi {
159
160 static void action_init(const char *const *action)
161 {
162   XBT_DEBUG("Initialize the counters");
163   CHECK_ACTION_PARAMS(action, 0, 1)
164   if(action[2])
165     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166   else
167     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168
169   /* start a simulated timer */
170   smpi_process()->simulated_start();
171   /*initialize the number of active processes */
172   active_processes = smpi_process_count();
173
174   set_reqq_self(new std::vector<MPI_Request>);
175 }
176
177 static void action_finalize(const char *const *action)
178 {
179   /* Nothing to do */
180 }
181
182 static void action_comm_size(const char *const *action)
183 {
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, smpi_process()->simulated_elapsed());
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   log_timed_action (action, smpi_process()->simulated_elapsed());
191 }
192
193 static void action_comm_dup(const char *const *action)
194 {
195   log_timed_action (action, smpi_process()->simulated_elapsed());
196 }
197
198 static void action_compute(const char *const *action)
199 {
200   CHECK_ACTION_PARAMS(action, 1, 0)
201   double clock = smpi_process()->simulated_elapsed();
202   double flops= parse_double(action[2]);
203   int my_proc_id = Actor::self()->getPid();
204
205   TRACE_smpi_computing_in(my_proc_id, flops);
206   smpi_execute_flops(flops);
207   TRACE_smpi_computing_out(my_proc_id);
208
209   log_timed_action (action, clock);
210 }
211
212 static void action_send(const char *const *action)
213 {
214   CHECK_ACTION_PARAMS(action, 2, 1)
215   int to = atoi(action[2]);
216   double size=parse_double(action[3]);
217   double clock = smpi_process()->simulated_elapsed();
218
219   if(action[4])
220     MPI_CURRENT_TYPE=decode_datatype(action[4]);
221   else
222     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
223
224   int my_proc_id = Actor::self()->getPid();
225   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
226
227   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
228                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
229   if (not TRACE_smpi_view_internals())
230     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
231
232   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
233
234   TRACE_smpi_comm_out(my_proc_id);
235
236   log_timed_action(action, clock);
237 }
238
239 static void action_Isend(const char *const *action)
240 {
241   CHECK_ACTION_PARAMS(action, 2, 1)
242   int to = atoi(action[2]);
243   double size=parse_double(action[3]);
244   double clock = smpi_process()->simulated_elapsed();
245
246   if(action[4])
247     MPI_CURRENT_TYPE=decode_datatype(action[4]);
248   else
249     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250
251   int my_proc_id = Actor::self()->getPid();
252   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
253   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
254                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
255   if (not TRACE_smpi_view_internals())
256     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
257
258   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
259
260   TRACE_smpi_comm_out(my_proc_id);
261
262   get_reqq_self()->push_back(request);
263
264   log_timed_action (action, clock);
265 }
266
267 static void action_recv(const char *const *action) {
268   CHECK_ACTION_PARAMS(action, 2, 1)
269   int from = atoi(action[2]);
270   double size=parse_double(action[3]);
271   double clock = smpi_process()->simulated_elapsed();
272   MPI_Status status;
273
274   if(action[4])
275     MPI_CURRENT_TYPE=decode_datatype(action[4]);
276   else
277     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
278
279   int my_proc_id = Actor::self()->getPid();
280   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
281
282   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
283                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
284
285   //unknown size from the receiver point of view
286   if (size <= 0.0) {
287     Request::probe(from, 0, MPI_COMM_WORLD, &status);
288     size=status.count;
289   }
290
291   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
292
293   TRACE_smpi_comm_out(my_proc_id);
294   if (not TRACE_smpi_view_internals()) {
295     TRACE_smpi_recv(src_traced, my_proc_id, 0);
296   }
297
298   log_timed_action (action, clock);
299 }
300
301 static void action_Irecv(const char *const *action)
302 {
303   CHECK_ACTION_PARAMS(action, 2, 1)
304   int from = atoi(action[2]);
305   double size=parse_double(action[3]);
306   double clock = smpi_process()->simulated_elapsed();
307
308   if(action[4])
309     MPI_CURRENT_TYPE=decode_datatype(action[4]);
310   else
311     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
312
313   int my_proc_id = Actor::self()->getPid();
314   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
315                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
316   MPI_Status status;
317   //unknow size from the receiver pov
318   if (size <= 0.0) {
319     Request::probe(from, 0, MPI_COMM_WORLD, &status);
320     size = status.count;
321   }
322
323   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
324
325   TRACE_smpi_comm_out(my_proc_id);
326   get_reqq_self()->push_back(request);
327
328   log_timed_action (action, clock);
329 }
330
331 static void action_test(const char* const* action)
332 {
333   CHECK_ACTION_PARAMS(action, 0, 0)
334   double clock = smpi_process()->simulated_elapsed();
335   MPI_Status status;
336
337   MPI_Request request = get_reqq_self()->back();
338   get_reqq_self()->pop_back();
339   //if request is null here, this may mean that a previous test has succeeded
340   //Different times in traced application and replayed version may lead to this
341   //In this case, ignore the extra calls.
342   if(request!=nullptr){
343     int my_proc_id = Actor::self()->getPid();
344     TRACE_smpi_testing_in(my_proc_id);
345
346     int flag = Request::test(&request, &status);
347
348     XBT_DEBUG("MPI_Test result: %d", flag);
349     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
350     get_reqq_self()->push_back(request);
351
352     TRACE_smpi_testing_out(my_proc_id);
353   }
354   log_timed_action (action, clock);
355 }
356
357 static void action_wait(const char *const *action){
358   CHECK_ACTION_PARAMS(action, 0, 0)
359   double clock = smpi_process()->simulated_elapsed();
360   MPI_Status status;
361
362   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
363       xbt_str_join_array(action," "));
364   MPI_Request request = get_reqq_self()->back();
365   get_reqq_self()->pop_back();
366
367   if (request==nullptr){
368     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
369     return;
370   }
371
372   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
373
374   MPI_Group group = request->comm()->group();
375   int src_traced = group->rank(request->src());
376   int dst_traced = group->rank(request->dst());
377   int is_wait_for_receive = (request->flags() & RECV);
378   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
379
380   Request::wait(&request, &status);
381
382   TRACE_smpi_comm_out(rank);
383   if (is_wait_for_receive)
384     TRACE_smpi_recv(src_traced, dst_traced, 0);
385   log_timed_action (action, clock);
386 }
387
388 static void action_waitall(const char *const *action){
389   CHECK_ACTION_PARAMS(action, 0, 0)
390   double clock = smpi_process()->simulated_elapsed();
391   const unsigned int count_requests = get_reqq_self()->size();
392
393   if (count_requests>0) {
394     MPI_Status status[count_requests];
395
396     int my_proc_id_traced = Actor::self()->getPid();
397     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
398                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
399     int recvs_snd[count_requests];
400     int recvs_rcv[count_requests];
401     for (unsigned int i = 0; i < count_requests; i++) {
402       const auto& req = (*get_reqq_self())[i];
403       if (req && (req->flags() & RECV)) {
404         recvs_snd[i] = req->src();
405         recvs_rcv[i] = req->dst();
406       } else
407         recvs_snd[i] = -100;
408    }
409    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
410
411    for (unsigned i = 0; i < count_requests; i++) {
412      if (recvs_snd[i]!=-100)
413        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
414    }
415    TRACE_smpi_comm_out(my_proc_id_traced);
416   }
417   log_timed_action (action, clock);
418 }
419
420 static void action_barrier(const char *const *action){
421   double clock = smpi_process()->simulated_elapsed();
422   int my_proc_id = Actor::self()->getPid();
423   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
424
425   Colls::barrier(MPI_COMM_WORLD);
426
427   TRACE_smpi_comm_out(my_proc_id);
428   log_timed_action (action, clock);
429 }
430
431 static void action_bcast(const char *const *action)
432 {
433   CHECK_ACTION_PARAMS(action, 1, 2)
434   double size = parse_double(action[2]);
435   double clock = smpi_process()->simulated_elapsed();
436   int root=0;
437   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
438   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
439
440   if(action[3]) {
441     root= atoi(action[3]);
442     if(action[4])
443       MPI_CURRENT_TYPE=decode_datatype(action[4]);
444   }
445
446   int my_proc_id = Actor::self()->getPid();
447   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
448                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
449                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
450
451   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
452
453   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
454
455   TRACE_smpi_comm_out(my_proc_id);
456   log_timed_action (action, clock);
457 }
458
459 static void action_reduce(const char *const *action)
460 {
461   CHECK_ACTION_PARAMS(action, 2, 2)
462   double comm_size = parse_double(action[2]);
463   double comp_size = parse_double(action[3]);
464   double clock = smpi_process()->simulated_elapsed();
465   int root=0;
466   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
467
468   if(action[4]) {
469     root= atoi(action[4]);
470     if(action[5])
471       MPI_CURRENT_TYPE=decode_datatype(action[5]);
472   }
473
474   int my_proc_id = Actor::self()->getPid();
475   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
476                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
477                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
478
479   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
482   smpi_execute_flops(comp_size);
483
484   TRACE_smpi_comm_out(my_proc_id);
485   log_timed_action (action, clock);
486 }
487
488 static void action_allReduce(const char *const *action) {
489   CHECK_ACTION_PARAMS(action, 2, 1)
490   double comm_size = parse_double(action[2]);
491   double comp_size = parse_double(action[3]);
492
493   if(action[4])
494     MPI_CURRENT_TYPE=decode_datatype(action[4]);
495   else
496     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
497
498   double clock = smpi_process()->simulated_elapsed();
499   int my_proc_id = Actor::self()->getPid();
500   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
501                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
502
503   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
504   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
505   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
506   smpi_execute_flops(comp_size);
507
508   TRACE_smpi_comm_out(my_proc_id);
509   log_timed_action (action, clock);
510 }
511
512 static void action_allToAll(const char *const *action) {
513   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
514   double clock = smpi_process()->simulated_elapsed();
515   int comm_size = MPI_COMM_WORLD->size();
516   int send_size = parse_double(action[2]);
517   int recv_size = parse_double(action[3]);
518   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
519
520   if(action[4] && action[5]) {
521     MPI_CURRENT_TYPE=decode_datatype(action[4]);
522     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
523   }
524   else
525     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
526
527   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
528   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
529
530   int my_proc_id = Actor::self()->getPid();
531   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
532                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
533                                                     encode_datatype(MPI_CURRENT_TYPE),
534                                                     encode_datatype(MPI_CURRENT_TYPE2)));
535
536   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
537
538   TRACE_smpi_comm_out(my_proc_id);
539   log_timed_action (action, clock);
540 }
541
542 static void action_gather(const char *const *action) {
543   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
544         0 gather 68 68 0 0 0
545       where:
546         1) 68 is the sendcounts
547         2) 68 is the recvcounts
548         3) 0 is the root node
549         4) 0 is the send datatype id, see decode_datatype()
550         5) 0 is the recv datatype id, see decode_datatype()
551   */
552   CHECK_ACTION_PARAMS(action, 2, 3)
553   double clock = smpi_process()->simulated_elapsed();
554   int comm_size = MPI_COMM_WORLD->size();
555   int send_size = parse_double(action[2]);
556   int recv_size = parse_double(action[3]);
557   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
558   if(action[4] && action[5]) {
559     MPI_CURRENT_TYPE=decode_datatype(action[5]);
560     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
561   } else {
562     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
563   }
564   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
565   void *recv = nullptr;
566   int root=0;
567   if(action[4])
568     root=atoi(action[4]);
569   int rank = MPI_COMM_WORLD->rank();
570
571   if(rank==root)
572     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
573
574   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
575                                                                         encode_datatype(MPI_CURRENT_TYPE),
576                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
577
578   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
579
580   TRACE_smpi_comm_out(Actor::self()->getPid());
581   log_timed_action (action, clock);
582 }
583
584 static void action_scatter(const char* const* action)
585 {
586   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
587         0 gather 68 68 0 0 0
588       where:
589         1) 68 is the sendcounts
590         2) 68 is the recvcounts
591         3) 0 is the root node
592         4) 0 is the send datatype id, see decode_datatype()
593         5) 0 is the recv datatype id, see decode_datatype()
594   */
595   CHECK_ACTION_PARAMS(action, 2, 3)
596   double clock                   = smpi_process()->simulated_elapsed();
597   int comm_size                  = MPI_COMM_WORLD->size();
598   int send_size                  = parse_double(action[2]);
599   int recv_size                  = parse_double(action[3]);
600   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601   if (action[4] && action[5]) {
602     MPI_CURRENT_TYPE  = decode_datatype(action[5]);
603     MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
604   } else {
605     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
606   }
607   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
608   void* recv = nullptr;
609   int root   = 0;
610   if (action[4])
611     root   = atoi(action[4]);
612   int rank = MPI_COMM_WORLD->rank();
613
614   if (rank == root)
615     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
616
617   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
618                                                                         encode_datatype(MPI_CURRENT_TYPE),
619                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
620
621   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
622
623   TRACE_smpi_comm_out(Actor::self()->getPid());
624   log_timed_action(action, clock);
625 }
626
627 static void action_gatherv(const char *const *action) {
628   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
629        0 gather 68 68 10 10 10 0 0 0
630      where:
631        1) 68 is the sendcount
632        2) 68 10 10 10 is the recvcounts
633        3) 0 is the root node
634        4) 0 is the send datatype id, see decode_datatype()
635        5) 0 is the recv datatype id, see decode_datatype()
636   */
637   double clock = smpi_process()->simulated_elapsed();
638   int comm_size = MPI_COMM_WORLD->size();
639   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
640   int send_size = parse_double(action[2]);
641   int disps[comm_size];
642   int recvcounts[comm_size];
643   int recv_sum=0;
644
645   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
646   if(action[4+comm_size] && action[5+comm_size]) {
647     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
648     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
649   } else
650     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
651
652   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
653   void *recv = nullptr;
654   for(int i=0;i<comm_size;i++) {
655     recvcounts[i] = atoi(action[i+3]);
656     recv_sum=recv_sum+recvcounts[i];
657     disps[i]=0;
658   }
659
660   int root=atoi(action[3+comm_size]);
661   int rank = MPI_COMM_WORLD->rank();
662
663   if(rank==root)
664     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
665
666   std::vector<int>* trace_recvcounts = new std::vector<int>;
667   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
668     trace_recvcounts->push_back(recvcounts[i]);
669
670   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
672                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
673
674   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
675
676   TRACE_smpi_comm_out(Actor::self()->getPid());
677   log_timed_action (action, clock);
678 }
679
680 static void action_scatterv(const char* const* action)
681 {
682   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
683        0 gather 68 10 10 10 68 0 0 0
684      where:
685        1) 68 10 10 10 is the sendcounts
686        2) 68 is the recvcount
687        3) 0 is the root node
688        4) 0 is the send datatype id, see decode_datatype()
689        5) 0 is the recv datatype id, see decode_datatype()
690   */
691   double clock  = smpi_process()->simulated_elapsed();
692   int comm_size = MPI_COMM_WORLD->size();
693   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
694   int recv_size = parse_double(action[2 + comm_size]);
695   int disps[comm_size];
696   int sendcounts[comm_size];
697   int send_sum = 0;
698
699   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
700   if (action[4 + comm_size] && action[5 + comm_size]) {
701     MPI_CURRENT_TYPE  = decode_datatype(action[4 + comm_size]);
702     MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
703   } else
704     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
705
706   void* send = nullptr;
707   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
708   for (int i = 0; i < comm_size; i++) {
709     sendcounts[i] = atoi(action[i + 2]);
710     send_sum += sendcounts[i];
711     disps[i] = 0;
712   }
713
714   int root = atoi(action[3 + comm_size]);
715   int rank = MPI_COMM_WORLD->rank();
716
717   if (rank == root)
718     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
719
720   std::vector<int>* trace_sendcounts = new std::vector<int>;
721   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
722     trace_sendcounts->push_back(sendcounts[i]);
723
724   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
725                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
726                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
727
728   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
729
730   TRACE_smpi_comm_out(Actor::self()->getPid());
731   log_timed_action(action, clock);
732 }
733
734 static void action_reducescatter(const char *const *action) {
735  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
736       0 reduceScatter 275427 275427 275427 204020 11346849 0
737     where:
738       1) The first four values after the name of the action declare the recvcounts array
739       2) The value 11346849 is the amount of instructions
740       3) The last value corresponds to the datatype, see decode_datatype().
741 */
742   double clock = smpi_process()->simulated_elapsed();
743   int comm_size = MPI_COMM_WORLD->size();
744   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
745   int comp_size = parse_double(action[2+comm_size]);
746   int recvcounts[comm_size];
747   int my_proc_id                     = Actor::self()->getPid();
748   int size = 0;
749   std::vector<int>* trace_recvcounts = new std::vector<int>;
750   if(action[3+comm_size])
751     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
752   else
753     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
754
755   for(int i=0;i<comm_size;i++) {
756     recvcounts[i] = atoi(action[i+2]);
757     trace_recvcounts->push_back(recvcounts[i]);
758     size+=recvcounts[i];
759   }
760
761   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
762                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
763                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
764                                                        encode_datatype(MPI_CURRENT_TYPE)));
765
766   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
767   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
768
769   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
770   smpi_execute_flops(comp_size);
771
772   TRACE_smpi_comm_out(my_proc_id);
773   log_timed_action (action, clock);
774 }
775
776 static void action_allgather(const char *const *action) {
777   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
778         0 allGather 275427 275427
779     where:
780         1) 275427 is the sendcount
781         2) 275427 is the recvcount
782         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
783   */
784   double clock = smpi_process()->simulated_elapsed();
785
786   CHECK_ACTION_PARAMS(action, 2, 2)
787   int sendcount=atoi(action[2]);
788   int recvcount=atoi(action[3]);
789
790   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
791
792   if(action[4] && action[5]) {
793     MPI_CURRENT_TYPE = decode_datatype(action[4]);
794     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
795   } else
796     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
797
798   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
799   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
800
801   int my_proc_id = Actor::self()->getPid();
802
803   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
804                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
805                                                     encode_datatype(MPI_CURRENT_TYPE),
806                                                     encode_datatype(MPI_CURRENT_TYPE2)));
807
808   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
809
810   TRACE_smpi_comm_out(my_proc_id);
811   log_timed_action (action, clock);
812 }
813
814 static void action_allgatherv(const char *const *action) {
815   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
816         0 allGatherV 275427 275427 275427 275427 204020
817      where:
818         1) 275427 is the sendcount
819         2) The next four elements declare the recvcounts array
820         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
821   */
822   double clock = smpi_process()->simulated_elapsed();
823
824   int comm_size = MPI_COMM_WORLD->size();
825   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
826   int sendcount=atoi(action[2]);
827   int recvcounts[comm_size];
828   int disps[comm_size];
829   int recv_sum=0;
830   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
831
832   if(action[3+comm_size] && action[4+comm_size]) {
833     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
834     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
835   } else
836     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
837
838   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
839
840   for(int i=0;i<comm_size;i++) {
841     recvcounts[i] = atoi(action[i+3]);
842     recv_sum=recv_sum+recvcounts[i];
843     disps[i] = 0;
844   }
845   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
846
847   int my_proc_id = Actor::self()->getPid();
848
849   std::vector<int>* trace_recvcounts = new std::vector<int>;
850   for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
851     trace_recvcounts->push_back(recvcounts[i]);
852
853   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
854                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
855                                                        encode_datatype(MPI_CURRENT_TYPE),
856                                                        encode_datatype(MPI_CURRENT_TYPE2)));
857
858   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
859                           MPI_COMM_WORLD);
860
861   TRACE_smpi_comm_out(my_proc_id);
862   log_timed_action (action, clock);
863 }
864
865 static void action_allToAllv(const char *const *action) {
866   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
867         0 allToAllV 100 1 7 10 12 100 1 70 10 5
868      where:
869         1) 100 is the size of the send buffer *sizeof(int),
870         2) 1 7 10 12 is the sendcounts array
871         3) 100*sizeof(int) is the size of the receiver buffer
872         4)  1 70 10 5 is the recvcounts array
873   */
874   double clock = smpi_process()->simulated_elapsed();
875
876   int comm_size = MPI_COMM_WORLD->size();
877   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
878   int send_size = 0;
879   int recv_size = 0;
880   int sendcounts[comm_size];
881   std::vector<int>* trace_sendcounts = new std::vector<int>;
882   int recvcounts[comm_size];
883   std::vector<int>* trace_recvcounts = new std::vector<int>;
884   int senddisps[comm_size];
885   int recvdisps[comm_size];
886
887   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
888
889   int send_buf_size=parse_double(action[2]);
890   int recv_buf_size=parse_double(action[3+comm_size]);
891   if(action[4+2*comm_size] && action[5+2*comm_size]) {
892     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
893     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
894   }
895   else
896     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
897
898   int my_proc_id = Actor::self()->getPid();
899   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
900   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
901
902   for(int i=0;i<comm_size;i++) {
903     sendcounts[i] = atoi(action[i+3]);
904     trace_sendcounts->push_back(sendcounts[i]);
905     send_size += sendcounts[i];
906     recvcounts[i] = atoi(action[i+4+comm_size]);
907     trace_recvcounts->push_back(recvcounts[i]);
908     recv_size += recvcounts[i];
909     senddisps[i] = 0;
910     recvdisps[i] = 0;
911   }
912
913   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
914                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
915                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
916                                                        encode_datatype(MPI_CURRENT_TYPE2)));
917
918   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
919                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
920
921   TRACE_smpi_comm_out(my_proc_id);
922   log_timed_action (action, clock);
923 }
924
925 }} // namespace simgrid::smpi
926
927 /** @brief Only initialize the replay, don't do it for real */
928 void smpi_replay_init(int* argc, char*** argv)
929 {
930   simgrid::smpi::Process::init(argc, argv);
931   smpi_process()->mark_as_initialized();
932   smpi_process()->set_replaying(true);
933
934   int my_proc_id = Actor::self()->getPid();
935   TRACE_smpi_init(my_proc_id);
936   TRACE_smpi_computing_init(my_proc_id);
937   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
938   TRACE_smpi_comm_out(my_proc_id);
939   xbt_replay_action_register("init",       simgrid::smpi::action_init);
940   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
941   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
942   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
943   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
944   xbt_replay_action_register("send",       simgrid::smpi::action_send);
945   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
946   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
947   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
948   xbt_replay_action_register("test",       simgrid::smpi::action_test);
949   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
950   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
951   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
952   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
953   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
954   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
955   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
956   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
957   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
958   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
959   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
960   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
961   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
962   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
963   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
964   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
965
966   //if we have a delayed start, sleep here.
967   if(*argc>2){
968     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
969     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
970     smpi_execute_flops(value);
971   } else {
972     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
973     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
974     smpi_execute_flops(0.0);
975   }
976 }
977
978 /** @brief actually run the replay after initialization */
979 void smpi_replay_main(int* argc, char*** argv)
980 {
981   simgrid::xbt::replay_runner(*argc, *argv);
982
983   /* and now, finalize everything */
984   /* One active process will stop. Decrease the counter*/
985   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
986   if (not get_reqq_self()->empty()) {
987     unsigned int count_requests=get_reqq_self()->size();
988     MPI_Request requests[count_requests];
989     MPI_Status status[count_requests];
990     unsigned int i=0;
991
992     for (auto const& req : *get_reqq_self()) {
993       requests[i] = req;
994       i++;
995     }
996     simgrid::smpi::Request::waitall(count_requests, requests, status);
997   }
998   delete get_reqq_self();
999   active_processes--;
1000
1001   if(active_processes==0){
1002     /* Last process alive speaking: end the simulated timer */
1003     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1004     xbt_free(sendbuffer);
1005     xbt_free(recvbuffer);
1006   }
1007
1008   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1009
1010   smpi_process()->finalize();
1011
1012   TRACE_smpi_comm_out(Actor::self()->getPid());
1013   TRACE_smpi_finalize(Actor::self()->getPid());
1014 }
1015
1016 /** @brief chain a replay initialization and a replay start */
1017 void smpi_replay_run(int* argc, char*** argv)
1018 {
1019   smpi_replay_init(argc, argv);
1020   smpi_replay_main(argc, argv);
1021 }