Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
WIP stop using const char* in C++ layers
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <xbt.h>
9 #include <xbt/replay.h>
10 #include <unordered_map>
11 #include <vector>
12
13 #define KEY_SIZE (sizeof(int) * 2 + 1)
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
20
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
23
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
28
29 static void log_timed_action (const char *const *action, double clock){
30   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31     char *name = xbt_str_join_array(action, " ");
32     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
33     xbt_free(name);
34   }
35 }
36
37 static std::vector<MPI_Request>* get_reqq_self()
38 {
39   return reqq.at(smpi_process_index());
40 }
41
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
43 {
44    reqq.insert({smpi_process_index(), mpi_request});
45 }
46
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
49 {
50   if (!smpi_process_get_replaying())
51     return xbt_malloc(size);
52   if (sendbuffer_size<size){
53     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
54     sendbuffer_size=size;
55   }
56   return sendbuffer;
57 }
58
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61   if (!smpi_process_get_replaying())
62     return xbt_malloc(size);
63   if (recvbuffer_size<size){
64     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
65     recvbuffer_size=size;
66   }
67   return recvbuffer;
68 }
69
70 void smpi_free_tmp_buffer(void* buf){
71   if (!smpi_process_get_replaying())
72     xbt_free(buf);
73 }
74
75 /* Helper function */
76 static double parse_double(const char *string)
77 {
78   char *endptr;
79   double value = strtod(string, &endptr);
80   if (*endptr != '\0')
81     THROWF(unknown_error, 0, "%s is not a double", string);
82   return value;
83 }
84
85 static MPI_Datatype decode_datatype(const char *const action)
86 {
87   switch(atoi(action)) {
88     case 0:
89       MPI_CURRENT_TYPE=MPI_DOUBLE;
90       break;
91     case 1:
92       MPI_CURRENT_TYPE=MPI_INT;
93       break;
94     case 2:
95       MPI_CURRENT_TYPE=MPI_CHAR;
96       break;
97     case 3:
98       MPI_CURRENT_TYPE=MPI_SHORT;
99       break;
100     case 4:
101       MPI_CURRENT_TYPE=MPI_LONG;
102       break;
103     case 5:
104       MPI_CURRENT_TYPE=MPI_FLOAT;
105       break;
106     case 6:
107       MPI_CURRENT_TYPE=MPI_BYTE;
108       break;
109     default:
110       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
111   }
112    return MPI_CURRENT_TYPE;
113 }
114
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
116 {
117   //default type for output is set to MPI_BYTE
118   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119   if(known!=nullptr)
120     *known=1;
121   if (datatype==MPI_BYTE)
122       return "";
123   if(datatype==MPI_DOUBLE)
124       return "0";
125   if(datatype==MPI_INT)
126       return "1";
127   if(datatype==MPI_CHAR)
128       return "2";
129   if(datatype==MPI_SHORT)
130       return "3";
131   if(datatype==MPI_LONG)
132     return "4";
133   if(datatype==MPI_FLOAT)
134       return "5";
135   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136   if(known!=nullptr)
137     *known=0;
138   // default - not implemented.
139   // do not warn here as we pass in this function even for other trace formats
140   return "-1";
141 }
142
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144     int i=0;\
145     while(action[i]!=nullptr)\
146      i++;\
147     if(i<mandatory+2)                                           \
148     THROWF(arg_error, 0, "%s replay failed.\n" \
149           "%d items were given on the line. First two should be process_id and action.  " \
150           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152   }
153
154 static void action_init(const char *const *action)
155 {
156   XBT_DEBUG("Initialize the counters");
157   CHECK_ACTION_PARAMS(action, 0, 1)
158   if(action[2]) 
159     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
160   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
161
162   /* start a simulated timer */
163   smpi_process_simulated_start();
164   /*initialize the number of active processes */
165   active_processes = smpi_process_count();
166
167   set_reqq_self(new std::vector<MPI_Request>);
168 }
169
170 static void action_finalize(const char *const *action)
171 {
172   /* do nothing */
173 }
174
175 static void action_comm_size(const char *const *action)
176 {
177   communicator_size = parse_double(action[2]);
178   log_timed_action (action, smpi_process_simulated_elapsed());
179 }
180
181 static void action_comm_split(const char *const *action)
182 {
183   log_timed_action (action, smpi_process_simulated_elapsed());
184 }
185
186 static void action_comm_dup(const char *const *action)
187 {
188   log_timed_action (action, smpi_process_simulated_elapsed());
189 }
190
191 static void action_compute(const char *const *action)
192 {
193   CHECK_ACTION_PARAMS(action, 1, 0)
194   double clock = smpi_process_simulated_elapsed();
195   double flops= parse_double(action[2]);
196   int rank = smpi_process_index();
197   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198   extra->type=TRACING_COMPUTING;
199   extra->comp_size=flops;
200   TRACE_smpi_computing_in(rank, extra);
201
202   smpi_execute_flops(flops);
203
204   TRACE_smpi_computing_out(rank);
205   log_timed_action (action, clock);
206 }
207
208 static void action_send(const char *const *action)
209 {
210   CHECK_ACTION_PARAMS(action, 2, 1)
211   int to = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process_simulated_elapsed();
214
215   if(action[4])
216     MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else
218     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
219
220   int rank = smpi_process_index();
221
222   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
223   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224   extra->type = TRACING_SEND;
225   extra->send_size = size;
226   extra->src = rank;
227   extra->dst = dst_traced;
228   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230   if (!TRACE_smpi_view_internals())
231     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
232
233   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
234
235   log_timed_action (action, clock);
236
237   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
238 }
239
240 static void action_Isend(const char *const *action)
241 {
242   CHECK_ACTION_PARAMS(action, 2, 1)
243   int to = atoi(action[2]);
244   double size=parse_double(action[3]);
245   double clock = smpi_process_simulated_elapsed();
246
247   if(action[4]) 
248     MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   else 
250     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251
252   int rank = smpi_process_index();
253   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
254   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255   extra->type = TRACING_ISEND;
256   extra->send_size = size;
257   extra->src = rank;
258   extra->dst = dst_traced;
259   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261   if (!TRACE_smpi_view_internals())
262     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
263
264   MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
265
266   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
267   request->send = 1;
268
269   get_reqq_self()->push_back(request);
270
271   log_timed_action (action, clock);
272 }
273
274 static void action_recv(const char *const *action) {
275   CHECK_ACTION_PARAMS(action, 2, 1)
276   int from = atoi(action[2]);
277   double size=parse_double(action[3]);
278   double clock = smpi_process_simulated_elapsed();
279   MPI_Status status;
280
281   if(action[4]) 
282     MPI_CURRENT_TYPE=decode_datatype(action[4]);
283   else 
284     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285
286   int rank = smpi_process_index();
287   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
288
289   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290   extra->type = TRACING_RECV;
291   extra->send_size = size;
292   extra->src = src_traced;
293   extra->dst = rank;
294   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
296
297   //unknown size from the receiver point of view
298   if(size<=0.0){
299     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
300     size=status.count;
301   }
302
303   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
304
305   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306   if (!TRACE_smpi_view_internals()) {
307     TRACE_smpi_recv(rank, src_traced, rank, 0);
308   }
309
310   log_timed_action (action, clock);
311 }
312
313 static void action_Irecv(const char *const *action)
314 {
315   CHECK_ACTION_PARAMS(action, 2, 1)
316   int from = atoi(action[2]);
317   double size=parse_double(action[3]);
318   double clock = smpi_process_simulated_elapsed();
319
320   if(action[4]) 
321     MPI_CURRENT_TYPE=decode_datatype(action[4]);
322   else 
323     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
324
325   int rank = smpi_process_index();
326   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type = TRACING_IRECV;
329   extra->send_size = size;
330   extra->src = src_traced;
331   extra->dst = rank;
332   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
334   MPI_Status status;
335   //unknow size from the receiver pov
336   if(size<=0.0){
337       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
338       size=status.count;
339   }
340
341   MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
342
343   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
344   request->recv = 1;
345   get_reqq_self()->push_back(request);
346
347   log_timed_action (action, clock);
348 }
349
350 static void action_test(const char *const *action){
351   CHECK_ACTION_PARAMS(action, 0, 0)
352   double clock = smpi_process_simulated_elapsed();
353   MPI_Status status;
354
355   MPI_Request request = get_reqq_self()->back();
356   get_reqq_self()->pop_back();
357   //if request is null here, this may mean that a previous test has succeeded 
358   //Different times in traced application and replayed version may lead to this 
359   //In this case, ignore the extra calls.
360   if(request!=nullptr){
361     int rank = smpi_process_index();
362     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363     extra->type=TRACING_TEST;
364     TRACE_smpi_testing_in(rank, extra);
365
366     int flag = smpi_mpi_test(&request, &status);
367
368     XBT_DEBUG("MPI_Test result: %d", flag);
369     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370     get_reqq_self()->push_back(request);
371
372     TRACE_smpi_testing_out(rank);
373   }
374   log_timed_action (action, clock);
375 }
376
377 static void action_wait(const char *const *action){
378   CHECK_ACTION_PARAMS(action, 0, 0)
379   double clock = smpi_process_simulated_elapsed();
380   MPI_Status status;
381
382   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   MPI_Request request = get_reqq_self()->back();
385   get_reqq_self()->pop_back();
386
387   if (request==nullptr){
388     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
389     return;
390   }
391
392   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
393
394   MPI_Group group = smpi_comm_group(request->comm);
395   int src_traced = smpi_group_rank(group, request->src);
396   int dst_traced = smpi_group_rank(group, request->dst);
397   int is_wait_for_receive = request->recv;
398   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399   extra->type = TRACING_WAIT;
400   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
401
402   smpi_mpi_wait(&request, &status);
403
404   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405   if (is_wait_for_receive)
406     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407   log_timed_action (action, clock);
408 }
409
410 static void action_waitall(const char *const *action){
411   CHECK_ACTION_PARAMS(action, 0, 0)
412   double clock = smpi_process_simulated_elapsed();
413   unsigned int count_requests=get_reqq_self()->size();
414
415   if (count_requests>0) {
416     MPI_Status status[count_requests];
417
418    int rank_traced = smpi_process_index();
419    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420    extra->type = TRACING_WAITALL;
421    extra->send_size=count_requests;
422    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423    int recvs_snd[count_requests];
424    int recvs_rcv[count_requests];
425    unsigned int i=0;
426    for (auto req : *(get_reqq_self())){
427      if (req && req->recv){
428        recvs_snd[i]=req->src;
429        recvs_rcv[i]=req->dst;
430      }else
431        recvs_snd[i]=-100;
432      i++;
433    }
434    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
435
436    for (i=0; i<count_requests;i++){
437      if (recvs_snd[i]!=-100)
438        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
439    }
440    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
441   }
442   log_timed_action (action, clock);
443 }
444
445 static void action_barrier(const char *const *action){
446   double clock = smpi_process_simulated_elapsed();
447   int rank = smpi_process_index();
448   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449   extra->type = TRACING_BARRIER;
450   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
451
452   mpi_coll_barrier_fun(MPI_COMM_WORLD);
453
454   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455   log_timed_action (action, clock);
456 }
457
458 static void action_bcast(const char *const *action)
459 {
460   CHECK_ACTION_PARAMS(action, 1, 2)
461   double size = parse_double(action[2]);
462   double clock = smpi_process_simulated_elapsed();
463   int root=0;
464   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
466
467   if(action[3]) {
468     root= atoi(action[3]);
469     if(action[4])
470       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
471   }
472
473   int rank = smpi_process_index();
474   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
475
476   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477   extra->type = TRACING_BCAST;
478   extra->send_size = size;
479   extra->root = root_traced;
480   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
481   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
482   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
483
484   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
485
486   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
487   log_timed_action (action, clock);
488 }
489
490 static void action_reduce(const char *const *action)
491 {
492   CHECK_ACTION_PARAMS(action, 2, 2)
493   double comm_size = parse_double(action[2]);
494   double comp_size = parse_double(action[3]);
495   double clock = smpi_process_simulated_elapsed();
496   int root=0;
497   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498
499   if(action[4]) {
500     root= atoi(action[4]);
501     if(action[5])
502       MPI_CURRENT_TYPE=decode_datatype(action[5]);
503   }
504
505   int rank = smpi_process_index();
506   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
507   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508   extra->type = TRACING_REDUCE;
509   extra->send_size = comm_size;
510   extra->comp_size = comp_size;
511   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
512   extra->root = root_traced;
513
514   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
515
516   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
517   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
518   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519   smpi_execute_flops(comp_size);
520
521   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
522   log_timed_action (action, clock);
523 }
524
525 static void action_allReduce(const char *const *action) {
526   CHECK_ACTION_PARAMS(action, 2, 1)
527   double comm_size = parse_double(action[2]);
528   double comp_size = parse_double(action[3]);
529
530   if(action[4])
531     MPI_CURRENT_TYPE=decode_datatype(action[4]);
532   else
533     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
534
535   double clock = smpi_process_simulated_elapsed();
536   int rank = smpi_process_index();
537   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538   extra->type = TRACING_ALLREDUCE;
539   extra->send_size = comm_size;
540   extra->comp_size = comp_size;
541   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
542   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
543
544   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
545   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
546   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547   smpi_execute_flops(comp_size);
548
549   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
550   log_timed_action (action, clock);
551 }
552
553 static void action_allToAll(const char *const *action) {
554   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555   double clock = smpi_process_simulated_elapsed();
556   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
557   int send_size = parse_double(action[2]);
558   int recv_size = parse_double(action[3]);
559   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
560
561   if(action[4] && action[5]) {
562     MPI_CURRENT_TYPE=decode_datatype(action[4]);
563     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
564   }
565   else
566     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
567
568   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
569   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
570
571   int rank = smpi_process_index();
572   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573   extra->type = TRACING_ALLTOALL;
574   extra->send_size = send_size;
575   extra->recv_size = recv_size;
576   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
577   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
578
579   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
580
581   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
582
583   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
584   log_timed_action (action, clock);
585 }
586
587 static void action_gather(const char *const *action) {
588   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
589         0 gather 68 68 0 0 0
590       where:
591         1) 68 is the sendcounts
592         2) 68 is the recvcounts
593         3) 0 is the root node
594         4) 0 is the send datatype id, see decode_datatype()
595         5) 0 is the recv datatype id, see decode_datatype()
596   */
597   CHECK_ACTION_PARAMS(action, 2, 3)
598   double clock = smpi_process_simulated_elapsed();
599   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
600   int send_size = parse_double(action[2]);
601   int recv_size = parse_double(action[3]);
602   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603   if(action[4] && action[5]) {
604     MPI_CURRENT_TYPE=decode_datatype(action[5]);
605     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
606   } else {
607     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
608   }
609   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
610   void *recv = nullptr;
611   int root=0;
612   if(action[4])
613     root=atoi(action[4]);
614   int rank = smpi_comm_rank(MPI_COMM_WORLD);
615
616   if(rank==root)
617     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
618
619   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620   extra->type = TRACING_GATHER;
621   extra->send_size = send_size;
622   extra->recv_size = recv_size;
623   extra->root = root;
624   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
625   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
626
627   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
628
629   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
630
631   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
632   log_timed_action (action, clock);
633 }
634
635 static void action_gatherv(const char *const *action) {
636   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637        0 gather 68 68 10 10 10 0 0 0
638      where:
639        1) 68 is the sendcount
640        2) 68 10 10 10 is the recvcounts
641        3) 0 is the root node
642        4) 0 is the send datatype id, see decode_datatype()
643        5) 0 is the recv datatype id, see decode_datatype()
644   */
645   double clock = smpi_process_simulated_elapsed();
646   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
647   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648   int send_size = parse_double(action[2]);
649   int disps[comm_size];
650   int recvcounts[comm_size];
651   int recv_sum=0;
652
653   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654   if(action[4+comm_size] && action[5+comm_size]) {
655     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
657   } else
658     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
659
660   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
661   void *recv = nullptr;
662   for(int i=0;i<comm_size;i++) {
663     recvcounts[i] = atoi(action[i+3]);
664     recv_sum=recv_sum+recvcounts[i];
665     disps[i]=0;
666   }
667
668   int root=atoi(action[3+comm_size]);
669   int rank = smpi_comm_rank(MPI_COMM_WORLD);
670
671   if(rank==root)
672     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
673
674   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675   extra->type = TRACING_GATHERV;
676   extra->send_size = send_size;
677   extra->recvcounts= xbt_new(int,comm_size);
678   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679     extra->recvcounts[i] = recvcounts[i];
680   extra->root = root;
681   extra->num_processes = comm_size;
682   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
683   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
684
685   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
686
687   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
688
689   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
690   log_timed_action (action, clock);
691 }
692
693 static void action_reducescatter(const char *const *action) {
694  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695       0 reduceScatter 275427 275427 275427 204020 11346849 0
696     where:
697       1) The first four values after the name of the action declare the recvcounts array
698       2) The value 11346849 is the amount of instructions
699       3) The last value corresponds to the datatype, see decode_datatype().
700 */
701   double clock = smpi_process_simulated_elapsed();
702   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
703   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704   int comp_size = parse_double(action[2+comm_size]);
705   int recvcounts[comm_size];
706   int rank = smpi_process_index();
707   int size = 0;
708   if(action[3+comm_size])
709     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
710   else
711     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
712
713   for(int i=0;i<comm_size;i++) {
714     recvcounts[i] = atoi(action[i+2]);
715     size+=recvcounts[i];
716   }
717
718   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719   extra->type = TRACING_REDUCE_SCATTER;
720   extra->send_size = 0;
721   extra->recvcounts= xbt_new(int, comm_size);
722   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723     extra->recvcounts[i] = recvcounts[i];
724   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
725   extra->comp_size = comp_size;
726   extra->num_processes = comm_size;
727
728   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
729
730   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
731   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
732
733   mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734   smpi_execute_flops(comp_size);
735
736   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
737   log_timed_action (action, clock);
738 }
739
740 static void action_allgather(const char *const *action) {
741   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742         0 allGather 275427 275427
743     where:
744         1) 275427 is the sendcount
745         2) 275427 is the recvcount
746         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
747   */
748   double clock = smpi_process_simulated_elapsed();
749
750   CHECK_ACTION_PARAMS(action, 2, 2)
751   int sendcount=atoi(action[2]); 
752   int recvcount=atoi(action[3]); 
753
754   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
755
756   if(action[4] && action[5]) {
757     MPI_CURRENT_TYPE = decode_datatype(action[4]);
758     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
759   } else
760     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
761
762   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
763   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
764
765   int rank = smpi_process_index();
766   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767   extra->type = TRACING_ALLGATHER;
768   extra->send_size = sendcount;
769   extra->recv_size= recvcount;
770   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
771   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
772   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
773
774   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
775
776   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
777
778   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
779   log_timed_action (action, clock);
780 }
781
782 static void action_allgatherv(const char *const *action) {
783   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784         0 allGatherV 275427 275427 275427 275427 204020
785      where:
786         1) 275427 is the sendcount
787         2) The next four elements declare the recvcounts array
788         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
789   */
790   double clock = smpi_process_simulated_elapsed();
791
792   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
793   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794   int sendcount=atoi(action[2]);
795   int recvcounts[comm_size];
796   int disps[comm_size];
797   int recv_sum=0;
798   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
799
800   if(action[3+comm_size] && action[4+comm_size]) {
801     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
803   } else
804     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
805
806   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
807
808   for(int i=0;i<comm_size;i++) {
809     recvcounts[i] = atoi(action[i+3]);
810     recv_sum=recv_sum+recvcounts[i];
811     disps[i] = 0;
812   }
813   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
814
815   int rank = smpi_process_index();
816   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817   extra->type = TRACING_ALLGATHERV;
818   extra->send_size = sendcount;
819   extra->recvcounts= xbt_new(int, comm_size);
820   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821     extra->recvcounts[i] = recvcounts[i];
822   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
823   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
824   extra->num_processes = comm_size;
825
826   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
827
828   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
829                           MPI_COMM_WORLD);
830
831   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
832   log_timed_action (action, clock);
833 }
834
835 static void action_allToAllv(const char *const *action) {
836   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837         0 allToAllV 100 1 7 10 12 100 1 70 10 5
838      where:
839         1) 100 is the size of the send buffer *sizeof(int),
840         2) 1 7 10 12 is the sendcounts array
841         3) 100*sizeof(int) is the size of the receiver buffer
842         4)  1 70 10 5 is the recvcounts array
843   */
844   double clock = smpi_process_simulated_elapsed();
845
846   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
847   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848   int sendcounts[comm_size];
849   int recvcounts[comm_size];
850   int senddisps[comm_size];
851   int recvdisps[comm_size];
852
853   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
854
855   int send_buf_size=parse_double(action[2]);
856   int recv_buf_size=parse_double(action[3+comm_size]);
857   if(action[4+2*comm_size] && action[5+2*comm_size]) {
858     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
860   }
861   else
862     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
863
864   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
865   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
866
867   for(int i=0;i<comm_size;i++) {
868     sendcounts[i] = atoi(action[i+3]);
869     recvcounts[i] = atoi(action[i+4+comm_size]);
870     senddisps[i] = 0;
871     recvdisps[i] = 0;
872   }
873
874   int rank = smpi_process_index();
875   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876   extra->type = TRACING_ALLTOALLV;
877   extra->recvcounts= xbt_new(int, comm_size);
878   extra->sendcounts= xbt_new(int, comm_size);
879   extra->num_processes = comm_size;
880
881   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882     extra->send_size += sendcounts[i];
883     extra->sendcounts[i] = sendcounts[i];
884     extra->recv_size += recvcounts[i];
885     extra->recvcounts[i] = recvcounts[i];
886   }
887   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
888   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
889
890   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
891
892   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
894
895   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
896   log_timed_action (action, clock);
897 }
898
899 void smpi_replay_run(int *argc, char***argv){
900   /* First initializes everything */
901   smpi_process_init(argc, argv);
902   smpi_process_mark_as_initialized();
903   smpi_process_set_replaying(true);
904
905   int rank = smpi_process_index();
906   TRACE_smpi_init(rank);
907   TRACE_smpi_computing_init(rank);
908   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
909   extra->type = TRACING_INIT;
910   char *operation =bprintf("%s_init",__FUNCTION__);
911   TRACE_smpi_collective_in(rank, -1, operation, extra);
912   TRACE_smpi_collective_out(rank, -1, operation);
913   xbt_free(operation);
914
915   if (_xbt_replay_action_init()==0) {
916     xbt_replay_action_register("init",       action_init);
917     xbt_replay_action_register("finalize",   action_finalize);
918     xbt_replay_action_register("comm_size",  action_comm_size);
919     xbt_replay_action_register("comm_split", action_comm_split);
920     xbt_replay_action_register("comm_dup",   action_comm_dup);
921     xbt_replay_action_register("send",       action_send);
922     xbt_replay_action_register("Isend",      action_Isend);
923     xbt_replay_action_register("recv",       action_recv);
924     xbt_replay_action_register("Irecv",      action_Irecv);
925     xbt_replay_action_register("test",       action_test);
926     xbt_replay_action_register("wait",       action_wait);
927     xbt_replay_action_register("waitAll",    action_waitall);
928     xbt_replay_action_register("barrier",    action_barrier);
929     xbt_replay_action_register("bcast",      action_bcast);
930     xbt_replay_action_register("reduce",     action_reduce);
931     xbt_replay_action_register("allReduce",  action_allReduce);
932     xbt_replay_action_register("allToAll",   action_allToAll);
933     xbt_replay_action_register("allToAllV",  action_allToAllv);
934     xbt_replay_action_register("gather",  action_gather);
935     xbt_replay_action_register("gatherV",  action_gatherv);
936     xbt_replay_action_register("allGather",  action_allgather);
937     xbt_replay_action_register("allGatherV",  action_allgatherv);
938     xbt_replay_action_register("reduceScatter",  action_reducescatter);
939     xbt_replay_action_register("compute",    action_compute);
940   }
941
942   //if we have a delayed start, sleep here.
943   if(*argc>2){
944     char *endptr;
945     double value = strtod((*argv)[2], &endptr);
946     if (*endptr != '\0')
947       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
948     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
949     smpi_execute_flops(value);
950   } else {
951     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
952     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
953     smpi_execute_flops(0.0);
954   }
955
956   /* Actually run the replay */
957   xbt_replay_action_runner(*argc, *argv);
958
959   /* and now, finalize everything */
960   /* One active process will stop. Decrease the counter*/
961   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
962   if (!get_reqq_self()->empty()){
963     unsigned int count_requests=get_reqq_self()->size();
964     MPI_Request requests[count_requests];
965     MPI_Status status[count_requests];
966     unsigned int i=0;
967
968     for (auto req: *get_reqq_self()){
969       requests[i] = req;
970       i++;
971     }
972     smpi_mpi_waitall(count_requests, requests, status);
973   }
974   active_processes--;
975
976   if(active_processes==0){
977     /* Last process alive speaking: end the simulated timer */
978     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
979     _xbt_replay_action_exit();
980     xbt_free(sendbuffer);
981     xbt_free(recvbuffer);
982   }
983
984   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
985   extra_fin->type = TRACING_FINALIZE;
986   operation =bprintf("%s_finalize",__FUNCTION__);
987   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
988
989   smpi_process_finalize();
990
991   TRACE_smpi_collective_out(rank, -1, operation);
992   TRACE_smpi_finalize(smpi_process_index());
993   smpi_process_destroy();
994   xbt_free(operation);
995 }