Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'clean_events' of github.com:Takishipp/simgrid into clean_events
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/smpi/private.h"
7 #include "src/smpi/smpi_coll.hpp"
8 #include "src/smpi/smpi_comm.hpp"
9 #include "src/smpi/smpi_datatype.hpp"
10 #include "src/smpi/smpi_group.hpp"
11 #include "src/smpi/smpi_process.hpp"
12 #include "src/smpi/smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
25
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(smpi_process()->index());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({smpi_process()->index(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90 static MPI_Datatype decode_datatype(const char *const action)
91 {
92   switch(atoi(action)) {
93     case 0:
94       MPI_CURRENT_TYPE=MPI_DOUBLE;
95       break;
96     case 1:
97       MPI_CURRENT_TYPE=MPI_INT;
98       break;
99     case 2:
100       MPI_CURRENT_TYPE=MPI_CHAR;
101       break;
102     case 3:
103       MPI_CURRENT_TYPE=MPI_SHORT;
104       break;
105     case 4:
106       MPI_CURRENT_TYPE=MPI_LONG;
107       break;
108     case 5:
109       MPI_CURRENT_TYPE=MPI_FLOAT;
110       break;
111     case 6:
112       MPI_CURRENT_TYPE=MPI_BYTE;
113       break;
114     default:
115       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
116       break;
117   }
118    return MPI_CURRENT_TYPE;
119 }
120
121 const char* encode_datatype(MPI_Datatype datatype, int* known)
122 {
123   //default type for output is set to MPI_BYTE
124   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125   if(known!=nullptr)
126     *known=1;
127   if (datatype==MPI_BYTE)
128       return "";
129   if(datatype==MPI_DOUBLE)
130       return "0";
131   if(datatype==MPI_INT)
132       return "1";
133   if(datatype==MPI_CHAR)
134       return "2";
135   if(datatype==MPI_SHORT)
136       return "3";
137   if(datatype==MPI_LONG)
138     return "4";
139   if(datatype==MPI_FLOAT)
140       return "5";
141   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
142   if(known!=nullptr)
143     *known=0;
144   // default - not implemented.
145   // do not warn here as we pass in this function even for other trace formats
146   return "-1";
147 }
148
149 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150     int i=0;\
151     while(action[i]!=nullptr)\
152      i++;\
153     if(i<mandatory+2)                                           \
154     THROWF(arg_error, 0, "%s replay failed.\n" \
155           "%d items were given on the line. First two should be process_id and action.  " \
156           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
157           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158   }
159
160 namespace simgrid {
161 namespace smpi {
162
163 static void action_init(const char *const *action)
164 {
165   XBT_DEBUG("Initialize the counters");
166   CHECK_ACTION_PARAMS(action, 0, 1)
167   if(action[2])
168     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
169   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
170
171   /* start a simulated timer */
172   smpi_process()->simulated_start();
173   /*initialize the number of active processes */
174   active_processes = smpi_process_count();
175
176   set_reqq_self(new std::vector<MPI_Request>);
177 }
178
179 static void action_finalize(const char *const *action)
180 {
181   /* Nothing to do */
182 }
183
184 static void action_comm_size(const char *const *action)
185 {
186   communicator_size = parse_double(action[2]);
187   log_timed_action (action, smpi_process()->simulated_elapsed());
188 }
189
190 static void action_comm_split(const char *const *action)
191 {
192   log_timed_action (action, smpi_process()->simulated_elapsed());
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   log_timed_action (action, smpi_process()->simulated_elapsed());
198 }
199
200 static void action_compute(const char *const *action)
201 {
202   CHECK_ACTION_PARAMS(action, 1, 0)
203   double clock = smpi_process()->simulated_elapsed();
204   double flops= parse_double(action[2]);
205   int rank = smpi_process()->index();
206   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207   extra->type=TRACING_COMPUTING;
208   extra->comp_size=flops;
209   TRACE_smpi_computing_in(rank, extra);
210
211   smpi_execute_flops(flops);
212
213   TRACE_smpi_computing_out(rank);
214   log_timed_action (action, clock);
215 }
216
217 static void action_send(const char *const *action)
218 {
219   CHECK_ACTION_PARAMS(action, 2, 1)
220   int to = atoi(action[2]);
221   double size=parse_double(action[3]);
222   double clock = smpi_process()->simulated_elapsed();
223
224   if(action[4])
225     MPI_CURRENT_TYPE=decode_datatype(action[4]);
226   else
227     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
228
229   int rank = smpi_process()->index();
230
231   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
232   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233   extra->type = TRACING_SEND;
234   extra->send_size = size;
235   extra->src = rank;
236   extra->dst = dst_traced;
237   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
238   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
239   if (not TRACE_smpi_view_internals())
240     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
241
242   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
243
244   log_timed_action (action, clock);
245
246   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
247 }
248
249 static void action_Isend(const char *const *action)
250 {
251   CHECK_ACTION_PARAMS(action, 2, 1)
252   int to = atoi(action[2]);
253   double size=parse_double(action[3]);
254   double clock = smpi_process()->simulated_elapsed();
255
256   if(action[4])
257     MPI_CURRENT_TYPE=decode_datatype(action[4]);
258   else
259     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
260
261   int rank = smpi_process()->index();
262   int dst_traced = MPI_COMM_WORLD->group()->rank(to);
263   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
264   extra->type = TRACING_ISEND;
265   extra->send_size = size;
266   extra->src = rank;
267   extra->dst = dst_traced;
268   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
269   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
270   if (not TRACE_smpi_view_internals())
271     TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
272
273   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
274
275   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
276
277   get_reqq_self()->push_back(request);
278
279   log_timed_action (action, clock);
280 }
281
282 static void action_recv(const char *const *action) {
283   CHECK_ACTION_PARAMS(action, 2, 1)
284   int from = atoi(action[2]);
285   double size=parse_double(action[3]);
286   double clock = smpi_process()->simulated_elapsed();
287   MPI_Status status;
288
289   if(action[4])
290     MPI_CURRENT_TYPE=decode_datatype(action[4]);
291   else
292     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
293
294   int rank = smpi_process()->index();
295   int src_traced = MPI_COMM_WORLD->group()->rank(from);
296
297   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
298   extra->type = TRACING_RECV;
299   extra->send_size = size;
300   extra->src = src_traced;
301   extra->dst = rank;
302   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
303   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
304
305   //unknown size from the receiver point of view
306   if(size<=0.0){
307     Request::probe(from, 0, MPI_COMM_WORLD, &status);
308     size=status.count;
309   }
310
311   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
312
313   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
314   if (not TRACE_smpi_view_internals()) {
315     TRACE_smpi_recv(rank, src_traced, rank, 0);
316   }
317
318   log_timed_action (action, clock);
319 }
320
321 static void action_Irecv(const char *const *action)
322 {
323   CHECK_ACTION_PARAMS(action, 2, 1)
324   int from = atoi(action[2]);
325   double size=parse_double(action[3]);
326   double clock = smpi_process()->simulated_elapsed();
327
328   if(action[4])
329     MPI_CURRENT_TYPE=decode_datatype(action[4]);
330   else
331     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
332
333   int rank = smpi_process()->index();
334   int src_traced = MPI_COMM_WORLD->group()->rank(from);
335   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336   extra->type = TRACING_IRECV;
337   extra->send_size = size;
338   extra->src = src_traced;
339   extra->dst = rank;
340   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
341   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
342   MPI_Status status;
343   //unknow size from the receiver pov
344   if(size<=0.0){
345       Request::probe(from, 0, MPI_COMM_WORLD, &status);
346       size=status.count;
347   }
348
349   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
350
351   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
352   get_reqq_self()->push_back(request);
353
354   log_timed_action (action, clock);
355 }
356
357 static void action_test(const char *const *action){
358   CHECK_ACTION_PARAMS(action, 0, 0)
359   double clock = smpi_process()->simulated_elapsed();
360   MPI_Status status;
361
362   MPI_Request request = get_reqq_self()->back();
363   get_reqq_self()->pop_back();
364   //if request is null here, this may mean that a previous test has succeeded
365   //Different times in traced application and replayed version may lead to this
366   //In this case, ignore the extra calls.
367   if(request!=nullptr){
368     int rank = smpi_process()->index();
369     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
370     extra->type=TRACING_TEST;
371     TRACE_smpi_testing_in(rank, extra);
372
373     int flag = Request::test(&request, &status);
374
375     XBT_DEBUG("MPI_Test result: %d", flag);
376     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
377     get_reqq_self()->push_back(request);
378
379     TRACE_smpi_testing_out(rank);
380   }
381   log_timed_action (action, clock);
382 }
383
384 static void action_wait(const char *const *action){
385   CHECK_ACTION_PARAMS(action, 0, 0)
386   double clock = smpi_process()->simulated_elapsed();
387   MPI_Status status;
388
389   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
390       xbt_str_join_array(action," "));
391   MPI_Request request = get_reqq_self()->back();
392   get_reqq_self()->pop_back();
393
394   if (request==nullptr){
395     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
396     return;
397   }
398
399   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
400
401   MPI_Group group = request->comm()->group();
402   int src_traced = group->rank(request->src());
403   int dst_traced = group->rank(request->dst());
404   int is_wait_for_receive = (request->flags() & RECV);
405   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406   extra->type = TRACING_WAIT;
407   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
408
409   Request::wait(&request, &status);
410
411   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
412   if (is_wait_for_receive)
413     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
414   log_timed_action (action, clock);
415 }
416
417 static void action_waitall(const char *const *action){
418   CHECK_ACTION_PARAMS(action, 0, 0)
419   double clock = smpi_process()->simulated_elapsed();
420   unsigned int count_requests=get_reqq_self()->size();
421
422   if (count_requests>0) {
423     MPI_Status status[count_requests];
424
425    int rank_traced = smpi_process()->index();
426    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
427    extra->type = TRACING_WAITALL;
428    extra->send_size=count_requests;
429    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
430    int recvs_snd[count_requests];
431    int recvs_rcv[count_requests];
432    unsigned int i=0;
433    for (auto req : *(get_reqq_self())){
434      if (req && (req->flags () & RECV)){
435        recvs_snd[i]=req->src();
436        recvs_rcv[i]=req->dst();
437      }else
438        recvs_snd[i]=-100;
439      i++;
440    }
441    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
442
443    for (i=0; i<count_requests;i++){
444      if (recvs_snd[i]!=-100)
445        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
446    }
447    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
448   }
449   log_timed_action (action, clock);
450 }
451
452 static void action_barrier(const char *const *action){
453   double clock = smpi_process()->simulated_elapsed();
454   int rank = smpi_process()->index();
455   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
456   extra->type = TRACING_BARRIER;
457   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
458
459   Colls::barrier(MPI_COMM_WORLD);
460
461   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
462   log_timed_action (action, clock);
463 }
464
465 static void action_bcast(const char *const *action)
466 {
467   CHECK_ACTION_PARAMS(action, 1, 2)
468   double size = parse_double(action[2]);
469   double clock = smpi_process()->simulated_elapsed();
470   int root=0;
471   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
472   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
473
474   if(action[3]) {
475     root= atoi(action[3]);
476     if(action[4])
477       MPI_CURRENT_TYPE=decode_datatype(action[4]);
478   }
479
480   int rank = smpi_process()->index();
481   int root_traced = MPI_COMM_WORLD->group()->index(root);
482
483   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
484   extra->type = TRACING_BCAST;
485   extra->send_size = size;
486   extra->root = root_traced;
487   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
488   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
489   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
490
491   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
492
493   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
494   log_timed_action (action, clock);
495 }
496
497 static void action_reduce(const char *const *action)
498 {
499   CHECK_ACTION_PARAMS(action, 2, 2)
500   double comm_size = parse_double(action[2]);
501   double comp_size = parse_double(action[3]);
502   double clock = smpi_process()->simulated_elapsed();
503   int root=0;
504   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
505
506   if(action[4]) {
507     root= atoi(action[4]);
508     if(action[5])
509       MPI_CURRENT_TYPE=decode_datatype(action[5]);
510   }
511
512   int rank = smpi_process()->index();
513   int root_traced = MPI_COMM_WORLD->group()->rank(root);
514   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
515   extra->type = TRACING_REDUCE;
516   extra->send_size = comm_size;
517   extra->comp_size = comp_size;
518   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
519   extra->root = root_traced;
520
521   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
522
523   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
525   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
526   smpi_execute_flops(comp_size);
527
528   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
529   log_timed_action (action, clock);
530 }
531
532 static void action_allReduce(const char *const *action) {
533   CHECK_ACTION_PARAMS(action, 2, 1)
534   double comm_size = parse_double(action[2]);
535   double comp_size = parse_double(action[3]);
536
537   if(action[4])
538     MPI_CURRENT_TYPE=decode_datatype(action[4]);
539   else
540     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
541
542   double clock = smpi_process()->simulated_elapsed();
543   int rank = smpi_process()->index();
544   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
545   extra->type = TRACING_ALLREDUCE;
546   extra->send_size = comm_size;
547   extra->comp_size = comp_size;
548   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
549   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
550
551   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
553   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
554   smpi_execute_flops(comp_size);
555
556   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
557   log_timed_action (action, clock);
558 }
559
560 static void action_allToAll(const char *const *action) {
561   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
562   double clock = smpi_process()->simulated_elapsed();
563   int comm_size = MPI_COMM_WORLD->size();
564   int send_size = parse_double(action[2]);
565   int recv_size = parse_double(action[3]);
566   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
567
568   if(action[4] && action[5]) {
569     MPI_CURRENT_TYPE=decode_datatype(action[4]);
570     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
571   }
572   else
573     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
574
575   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
576   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
577
578   int rank = smpi_process()->index();
579   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
580   extra->type = TRACING_ALLTOALL;
581   extra->send_size = send_size;
582   extra->recv_size = recv_size;
583   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
584   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
585
586   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
587
588   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
589
590   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
591   log_timed_action (action, clock);
592 }
593
594 static void action_gather(const char *const *action) {
595   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
596         0 gather 68 68 0 0 0
597       where:
598         1) 68 is the sendcounts
599         2) 68 is the recvcounts
600         3) 0 is the root node
601         4) 0 is the send datatype id, see decode_datatype()
602         5) 0 is the recv datatype id, see decode_datatype()
603   */
604   CHECK_ACTION_PARAMS(action, 2, 3)
605   double clock = smpi_process()->simulated_elapsed();
606   int comm_size = MPI_COMM_WORLD->size();
607   int send_size = parse_double(action[2]);
608   int recv_size = parse_double(action[3]);
609   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
610   if(action[4] && action[5]) {
611     MPI_CURRENT_TYPE=decode_datatype(action[5]);
612     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
613   } else {
614     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
615   }
616   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
617   void *recv = nullptr;
618   int root=0;
619   if(action[4])
620     root=atoi(action[4]);
621   int rank = MPI_COMM_WORLD->rank();
622
623   if(rank==root)
624     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
625
626   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
627   extra->type = TRACING_GATHER;
628   extra->send_size = send_size;
629   extra->recv_size = recv_size;
630   extra->root = root;
631   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
632   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
633
634   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
635
636   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
637
638   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
639   log_timed_action (action, clock);
640 }
641
642 static void action_gatherv(const char *const *action) {
643   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
644        0 gather 68 68 10 10 10 0 0 0
645      where:
646        1) 68 is the sendcount
647        2) 68 10 10 10 is the recvcounts
648        3) 0 is the root node
649        4) 0 is the send datatype id, see decode_datatype()
650        5) 0 is the recv datatype id, see decode_datatype()
651   */
652   double clock = smpi_process()->simulated_elapsed();
653   int comm_size = MPI_COMM_WORLD->size();
654   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
655   int send_size = parse_double(action[2]);
656   int disps[comm_size];
657   int recvcounts[comm_size];
658   int recv_sum=0;
659
660   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
661   if(action[4+comm_size] && action[5+comm_size]) {
662     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
663     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
664   } else
665     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
666
667   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
668   void *recv = nullptr;
669   for(int i=0;i<comm_size;i++) {
670     recvcounts[i] = atoi(action[i+3]);
671     recv_sum=recv_sum+recvcounts[i];
672     disps[i]=0;
673   }
674
675   int root=atoi(action[3+comm_size]);
676   int rank = MPI_COMM_WORLD->rank();
677
678   if(rank==root)
679     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
680
681   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
682   extra->type = TRACING_GATHERV;
683   extra->send_size = send_size;
684   extra->recvcounts= xbt_new(int,comm_size);
685   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
686     extra->recvcounts[i] = recvcounts[i];
687   extra->root = root;
688   extra->num_processes = comm_size;
689   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
690   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
691
692   TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
693
694   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
695
696   TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
697   log_timed_action (action, clock);
698 }
699
700 static void action_reducescatter(const char *const *action) {
701  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
702       0 reduceScatter 275427 275427 275427 204020 11346849 0
703     where:
704       1) The first four values after the name of the action declare the recvcounts array
705       2) The value 11346849 is the amount of instructions
706       3) The last value corresponds to the datatype, see decode_datatype().
707 */
708   double clock = smpi_process()->simulated_elapsed();
709   int comm_size = MPI_COMM_WORLD->size();
710   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
711   int comp_size = parse_double(action[2+comm_size]);
712   int recvcounts[comm_size];
713   int rank = smpi_process()->index();
714   int size = 0;
715   if(action[3+comm_size])
716     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
717   else
718     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
719
720   for(int i=0;i<comm_size;i++) {
721     recvcounts[i] = atoi(action[i+2]);
722     size+=recvcounts[i];
723   }
724
725   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
726   extra->type = TRACING_REDUCE_SCATTER;
727   extra->send_size = 0;
728   extra->recvcounts= xbt_new(int, comm_size);
729   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
730     extra->recvcounts[i] = recvcounts[i];
731   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
732   extra->comp_size = comp_size;
733   extra->num_processes = comm_size;
734
735   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
736
737   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
738   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
739
740   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
741   smpi_execute_flops(comp_size);
742
743   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
744   log_timed_action (action, clock);
745 }
746
747 static void action_allgather(const char *const *action) {
748   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
749         0 allGather 275427 275427
750     where:
751         1) 275427 is the sendcount
752         2) 275427 is the recvcount
753         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
754   */
755   double clock = smpi_process()->simulated_elapsed();
756
757   CHECK_ACTION_PARAMS(action, 2, 2)
758   int sendcount=atoi(action[2]);
759   int recvcount=atoi(action[3]);
760
761   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
762
763   if(action[4] && action[5]) {
764     MPI_CURRENT_TYPE = decode_datatype(action[4]);
765     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
766   } else
767     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
768
769   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
770   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
771
772   int rank = smpi_process()->index();
773   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
774   extra->type = TRACING_ALLGATHER;
775   extra->send_size = sendcount;
776   extra->recv_size= recvcount;
777   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
778   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
779   extra->num_processes = MPI_COMM_WORLD->size();
780
781   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
782
783   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
784
785   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
786   log_timed_action (action, clock);
787 }
788
789 static void action_allgatherv(const char *const *action) {
790   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
791         0 allGatherV 275427 275427 275427 275427 204020
792      where:
793         1) 275427 is the sendcount
794         2) The next four elements declare the recvcounts array
795         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
796   */
797   double clock = smpi_process()->simulated_elapsed();
798
799   int comm_size = MPI_COMM_WORLD->size();
800   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
801   int sendcount=atoi(action[2]);
802   int recvcounts[comm_size];
803   int disps[comm_size];
804   int recv_sum=0;
805   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
806
807   if(action[3+comm_size] && action[4+comm_size]) {
808     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
809     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
810   } else
811     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
812
813   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
814
815   for(int i=0;i<comm_size;i++) {
816     recvcounts[i] = atoi(action[i+3]);
817     recv_sum=recv_sum+recvcounts[i];
818     disps[i] = 0;
819   }
820   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
821
822   int rank = smpi_process()->index();
823   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824   extra->type = TRACING_ALLGATHERV;
825   extra->send_size = sendcount;
826   extra->recvcounts= xbt_new(int, comm_size);
827   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
828     extra->recvcounts[i] = recvcounts[i];
829   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
830   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
831   extra->num_processes = comm_size;
832
833   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834
835   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
836                           MPI_COMM_WORLD);
837
838   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
839   log_timed_action (action, clock);
840 }
841
842 static void action_allToAllv(const char *const *action) {
843   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
844         0 allToAllV 100 1 7 10 12 100 1 70 10 5
845      where:
846         1) 100 is the size of the send buffer *sizeof(int),
847         2) 1 7 10 12 is the sendcounts array
848         3) 100*sizeof(int) is the size of the receiver buffer
849         4)  1 70 10 5 is the recvcounts array
850   */
851   double clock = smpi_process()->simulated_elapsed();
852
853   int comm_size = MPI_COMM_WORLD->size();
854   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
855   int sendcounts[comm_size];
856   int recvcounts[comm_size];
857   int senddisps[comm_size];
858   int recvdisps[comm_size];
859
860   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
861
862   int send_buf_size=parse_double(action[2]);
863   int recv_buf_size=parse_double(action[3+comm_size]);
864   if(action[4+2*comm_size] && action[5+2*comm_size]) {
865     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
866     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
867   }
868   else
869     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
870
871   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
872   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
873
874   for(int i=0;i<comm_size;i++) {
875     sendcounts[i] = atoi(action[i+3]);
876     recvcounts[i] = atoi(action[i+4+comm_size]);
877     senddisps[i] = 0;
878     recvdisps[i] = 0;
879   }
880
881   int rank = smpi_process()->index();
882   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
883   extra->type = TRACING_ALLTOALLV;
884   extra->recvcounts= xbt_new(int, comm_size);
885   extra->sendcounts= xbt_new(int, comm_size);
886   extra->num_processes = comm_size;
887
888   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
889     extra->send_size += sendcounts[i];
890     extra->sendcounts[i] = sendcounts[i];
891     extra->recv_size += recvcounts[i];
892     extra->recvcounts[i] = recvcounts[i];
893   }
894   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
895   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
896
897   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
898
899   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
900                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
901
902   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
903   log_timed_action (action, clock);
904 }
905
906 }} // namespace simgrid::smpi
907
908 /** @brief Only initialize the replay, don't do it for real */
909 void smpi_replay_init(int* argc, char*** argv)
910 {
911   simgrid::smpi::Process::init(argc, argv);
912   smpi_process()->mark_as_initialized();
913   smpi_process()->set_replaying(true);
914
915   int rank = smpi_process()->index();
916   TRACE_smpi_init(rank);
917   TRACE_smpi_computing_init(rank);
918   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
919   extra->type = TRACING_INIT;
920   TRACE_smpi_collective_in(rank, -1, "smpi_replay_run_init", extra);
921   TRACE_smpi_collective_out(rank, -1, "smpi_replay_run_init");
922   xbt_replay_action_register("init",       simgrid::smpi::action_init);
923   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
924   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
925   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
926   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
927   xbt_replay_action_register("send",       simgrid::smpi::action_send);
928   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
929   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
930   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
931   xbt_replay_action_register("test",       simgrid::smpi::action_test);
932   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
933   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
934   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
935   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
936   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
937   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
938   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
939   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
940   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
941   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
942   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
943   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
944   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
945   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
946
947   //if we have a delayed start, sleep here.
948   if(*argc>2){
949     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
950     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
951     smpi_execute_flops(value);
952   } else {
953     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
954     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
955     smpi_execute_flops(0.0);
956   }
957 }
958
959 /** @brief actually run the replay after initialization */
960 void smpi_replay_main(int* argc, char*** argv)
961 {
962   simgrid::xbt::replay_runner(*argc, *argv);
963
964   /* and now, finalize everything */
965   /* One active process will stop. Decrease the counter*/
966   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
967   if (not get_reqq_self()->empty()) {
968     unsigned int count_requests=get_reqq_self()->size();
969     MPI_Request requests[count_requests];
970     MPI_Status status[count_requests];
971     unsigned int i=0;
972
973     for (auto req: *get_reqq_self()){
974       requests[i] = req;
975       i++;
976     }
977     simgrid::smpi::Request::waitall(count_requests, requests, status);
978   }
979   delete get_reqq_self();
980   active_processes--;
981
982   if(active_processes==0){
983     /* Last process alive speaking: end the simulated timer */
984     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
985     xbt_free(sendbuffer);
986     xbt_free(recvbuffer);
987   }
988
989   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
990   extra_fin->type = TRACING_FINALIZE;
991   TRACE_smpi_collective_in(smpi_process()->index(), -1, "smpi_replay_run_finalize", extra_fin);
992
993   smpi_process()->finalize();
994
995   TRACE_smpi_collective_out(smpi_process()->index(), -1, "smpi_replay_run_finalize");
996   TRACE_smpi_finalize(smpi_process()->index());
997 }
998
999 /** @brief chain a replay initialization and a replay start */
1000 void smpi_replay_run(int* argc, char*** argv)
1001 {
1002   smpi_replay_init(argc, argv);
1003   smpi_replay_main(argc, argv);
1004 }