Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Avoid potential invalid reads
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <xbt.h>
9 #include <xbt/replay.h>
10 #include <unordered_map>
11 #include <vector>
12
13 #define KEY_SIZE (sizeof(int) * 2 + 1)
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16
17 int communicator_size = 0;
18 static int active_processes = 0;
19 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
20
21 MPI_Datatype MPI_DEFAULT_TYPE;
22 MPI_Datatype MPI_CURRENT_TYPE;
23
24 static int sendbuffer_size=0;
25 char* sendbuffer=nullptr;
26 static int recvbuffer_size=0;
27 char* recvbuffer=nullptr;
28
29 static void log_timed_action (const char *const *action, double clock){
30   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
31     char *name = xbt_str_join_array(action, " ");
32     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
33     xbt_free(name);
34   }
35 }
36
37 static std::vector<MPI_Request>* get_reqq_self()
38 {
39   return reqq.at(smpi_process_index());
40 }
41
42 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
43 {
44    reqq.insert({smpi_process_index(), mpi_request});
45 }
46
47 //allocate a single buffer for all sends, growing it if needed
48 void* smpi_get_tmp_sendbuffer(int size)
49 {
50   if (!smpi_process_get_replaying())
51     return xbt_malloc(size);
52   if (sendbuffer_size<size){
53     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
54     sendbuffer_size=size;
55   }
56   return sendbuffer;
57 }
58
59 //allocate a single buffer for all recv
60 void* smpi_get_tmp_recvbuffer(int size){
61   if (!smpi_process_get_replaying())
62     return xbt_malloc(size);
63   if (recvbuffer_size<size){
64     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
65     recvbuffer_size=size;
66   }
67   return recvbuffer;
68 }
69
70 void smpi_free_tmp_buffer(void* buf){
71   if (!smpi_process_get_replaying())
72     xbt_free(buf);
73 }
74
75 /* Helper function */
76 static double parse_double(const char *string)
77 {
78   char *endptr;
79   double value = strtod(string, &endptr);
80   if (*endptr != '\0')
81     THROWF(unknown_error, 0, "%s is not a double", string);
82   return value;
83 }
84
85 static MPI_Datatype decode_datatype(const char *const action)
86 {
87   switch(atoi(action)) {
88     case 0:
89       MPI_CURRENT_TYPE=MPI_DOUBLE;
90       break;
91     case 1:
92       MPI_CURRENT_TYPE=MPI_INT;
93       break;
94     case 2:
95       MPI_CURRENT_TYPE=MPI_CHAR;
96       break;
97     case 3:
98       MPI_CURRENT_TYPE=MPI_SHORT;
99       break;
100     case 4:
101       MPI_CURRENT_TYPE=MPI_LONG;
102       break;
103     case 5:
104       MPI_CURRENT_TYPE=MPI_FLOAT;
105       break;
106     case 6:
107       MPI_CURRENT_TYPE=MPI_BYTE;
108       break;
109     default:
110       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
111   }
112    return MPI_CURRENT_TYPE;
113 }
114
115 const char* encode_datatype(MPI_Datatype datatype, int* known)
116 {
117   //default type for output is set to MPI_BYTE
118   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
119   if(known!=nullptr)
120     *known=1;
121   if (datatype==MPI_BYTE)
122       return "";
123   if(datatype==MPI_DOUBLE)
124       return "0";
125   if(datatype==MPI_INT)
126       return "1";
127   if(datatype==MPI_CHAR)
128       return "2";
129   if(datatype==MPI_SHORT)
130       return "3";
131   if(datatype==MPI_LONG)
132     return "4";
133   if(datatype==MPI_FLOAT)
134       return "5";
135   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
136   if(known!=nullptr)
137     *known=0;
138   // default - not implemented.
139   // do not warn here as we pass in this function even for other trace formats
140   return "-1";
141 }
142
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144     int i=0;\
145     while(action[i]!=nullptr)\
146      i++;\
147     if(i<mandatory+2)                                           \
148     THROWF(arg_error, 0, "%s replay failed.\n" \
149           "%d items were given on the line. First two should be process_id and action.  " \
150           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152   }
153
154 static void action_init(const char *const *action)
155 {
156   XBT_DEBUG("Initialize the counters");
157   CHECK_ACTION_PARAMS(action, 0, 1)
158   if(action[2]) 
159     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
160   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
161
162   /* start a simulated timer */
163   smpi_process_simulated_start();
164   /*initialize the number of active processes */
165   active_processes = smpi_process_count();
166
167   set_reqq_self(new std::vector<MPI_Request>);
168 }
169
170 static void action_finalize(const char *const *action)
171 {
172   /* do nothing */
173 }
174
175 static void action_comm_size(const char *const *action)
176 {
177   communicator_size = parse_double(action[2]);
178   log_timed_action (action, smpi_process_simulated_elapsed());
179 }
180
181 static void action_comm_split(const char *const *action)
182 {
183   log_timed_action (action, smpi_process_simulated_elapsed());
184 }
185
186 static void action_comm_dup(const char *const *action)
187 {
188   log_timed_action (action, smpi_process_simulated_elapsed());
189 }
190
191 static void action_compute(const char *const *action)
192 {
193   CHECK_ACTION_PARAMS(action, 1, 0)
194   double clock = smpi_process_simulated_elapsed();
195   double flops= parse_double(action[2]);
196   int rank = smpi_process_index();
197   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
198   extra->type=TRACING_COMPUTING;
199   extra->comp_size=flops;
200   TRACE_smpi_computing_in(rank, extra);
201
202   smpi_execute_flops(flops);
203
204   TRACE_smpi_computing_out(rank);
205   log_timed_action (action, clock);
206 }
207
208 static void action_send(const char *const *action)
209 {
210   CHECK_ACTION_PARAMS(action, 2, 1)
211   int to = atoi(action[2]);
212   double size=parse_double(action[3]);
213   double clock = smpi_process_simulated_elapsed();
214
215   if(action[4])
216     MPI_CURRENT_TYPE=decode_datatype(action[4]);
217   else
218     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
219
220   int rank = smpi_process_index();
221
222   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
223   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
224   extra->type = TRACING_SEND;
225   extra->send_size = size;
226   extra->src = rank;
227   extra->dst = dst_traced;
228   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
229   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
230   if (!TRACE_smpi_view_internals())
231     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
232
233   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
234
235   log_timed_action (action, clock);
236
237   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
238 }
239
240 static void action_Isend(const char *const *action)
241 {
242   CHECK_ACTION_PARAMS(action, 2, 1)
243   int to = atoi(action[2]);
244   double size=parse_double(action[3]);
245   double clock = smpi_process_simulated_elapsed();
246
247   if(action[4]) 
248     MPI_CURRENT_TYPE=decode_datatype(action[4]);
249   else 
250     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251
252   int rank = smpi_process_index();
253   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
254   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
255   extra->type = TRACING_ISEND;
256   extra->send_size = size;
257   extra->src = rank;
258   extra->dst = dst_traced;
259   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
260   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
261   if (!TRACE_smpi_view_internals())
262     TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
263
264   MPI_Request request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
265
266   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
267   request->send = 1;
268
269   get_reqq_self()->push_back(request);
270
271   log_timed_action (action, clock);
272 }
273
274 static void action_recv(const char *const *action) {
275   CHECK_ACTION_PARAMS(action, 2, 1)
276   int from = atoi(action[2]);
277   double size=parse_double(action[3]);
278   double clock = smpi_process_simulated_elapsed();
279   MPI_Status status;
280
281   if(action[4]) 
282     MPI_CURRENT_TYPE=decode_datatype(action[4]);
283   else 
284     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
285
286   int rank = smpi_process_index();
287   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
288
289   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
290   extra->type = TRACING_RECV;
291   extra->send_size = size;
292   extra->src = src_traced;
293   extra->dst = rank;
294   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
295   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
296
297   //unknown size from the receiver point of view
298   if(size<=0.0){
299     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
300     size=status.count;
301   }
302
303   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
304
305   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
306   if (!TRACE_smpi_view_internals()) {
307     TRACE_smpi_recv(rank, src_traced, rank, 0);
308   }
309
310   log_timed_action (action, clock);
311 }
312
313 static void action_Irecv(const char *const *action)
314 {
315   CHECK_ACTION_PARAMS(action, 2, 1)
316   int from = atoi(action[2]);
317   double size=parse_double(action[3]);
318   double clock = smpi_process_simulated_elapsed();
319
320   if(action[4]) 
321     MPI_CURRENT_TYPE=decode_datatype(action[4]);
322   else 
323     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
324
325   int rank = smpi_process_index();
326   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
327   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
328   extra->type = TRACING_IRECV;
329   extra->send_size = size;
330   extra->src = src_traced;
331   extra->dst = rank;
332   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
333   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
334   MPI_Status status;
335   //unknow size from the receiver pov
336   if(size<=0.0){
337       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
338       size=status.count;
339   }
340
341   MPI_Request request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
342
343   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
344   request->recv = 1;
345   get_reqq_self()->push_back(request);
346
347   log_timed_action (action, clock);
348 }
349
350 static void action_test(const char *const *action){
351   CHECK_ACTION_PARAMS(action, 0, 0)
352   double clock = smpi_process_simulated_elapsed();
353   MPI_Status status;
354
355   MPI_Request request = get_reqq_self()->back();
356   get_reqq_self()->pop_back();
357   //if request is null here, this may mean that a previous test has succeeded 
358   //Different times in traced application and replayed version may lead to this 
359   //In this case, ignore the extra calls.
360   if(request!=nullptr){
361     int rank = smpi_process_index();
362     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
363     extra->type=TRACING_TEST;
364     TRACE_smpi_testing_in(rank, extra);
365
366     int flag = smpi_mpi_test(&request, &status);
367
368     XBT_DEBUG("MPI_Test result: %d", flag);
369     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
370     get_reqq_self()->push_back(request);
371
372     TRACE_smpi_testing_out(rank);
373   }
374   log_timed_action (action, clock);
375 }
376
377 static void action_wait(const char *const *action){
378   CHECK_ACTION_PARAMS(action, 0, 0)
379   double clock = smpi_process_simulated_elapsed();
380   MPI_Status status;
381
382   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
383       xbt_str_join_array(action," "));
384   MPI_Request request = get_reqq_self()->back();
385   get_reqq_self()->pop_back();
386
387   if (request==nullptr){
388     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
389     return;
390   }
391
392   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
393
394   MPI_Group group = smpi_comm_group(request->comm);
395   int src_traced = smpi_group_rank(group, request->src);
396   int dst_traced = smpi_group_rank(group, request->dst);
397   int is_wait_for_receive = request->recv;
398   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
399   extra->type = TRACING_WAIT;
400   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
401
402   smpi_mpi_wait(&request, &status);
403
404   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
405   if (is_wait_for_receive)
406     TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
407   log_timed_action (action, clock);
408 }
409
410 static void action_waitall(const char *const *action){
411   CHECK_ACTION_PARAMS(action, 0, 0)
412   double clock = smpi_process_simulated_elapsed();
413   unsigned int count_requests=get_reqq_self()->size();
414
415   if (count_requests>0) {
416     MPI_Status status[count_requests];
417
418    int rank_traced = smpi_process_index();
419    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
420    extra->type = TRACING_WAITALL;
421    extra->send_size=count_requests;
422    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
423    int recvs_snd[count_requests];
424    int recvs_rcv[count_requests];
425    unsigned int i=0;
426    for (auto req : *(get_reqq_self())){
427      if (req && req->recv){
428        recvs_snd[i]=req->src;
429        recvs_rcv[i]=req->dst;
430      }else
431        recvs_snd[i]=-100;
432      i++;
433    }
434    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
435
436    for (i=0; i<count_requests;i++){
437      if (recvs_snd[i]!=-100)
438        TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
439    }
440    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
441   }
442   log_timed_action (action, clock);
443 }
444
445 static void action_barrier(const char *const *action){
446   double clock = smpi_process_simulated_elapsed();
447   int rank = smpi_process_index();
448   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449   extra->type = TRACING_BARRIER;
450   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
451
452   mpi_coll_barrier_fun(MPI_COMM_WORLD);
453
454   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
455   log_timed_action (action, clock);
456 }
457
458 static void action_bcast(const char *const *action)
459 {
460   CHECK_ACTION_PARAMS(action, 1, 2)
461   double size = parse_double(action[2]);
462   double clock = smpi_process_simulated_elapsed();
463   int root=0;
464   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
466
467   if(action[3]) {
468     root= atoi(action[3]);
469     if(action[4]) {
470       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
471     }
472   }
473
474   int rank = smpi_process_index();
475   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
476
477   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
478   extra->type = TRACING_BCAST;
479   extra->send_size = size;
480   extra->root = root_traced;
481   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
482   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
483   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
484
485   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
486
487   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
488   log_timed_action (action, clock);
489 }
490
491 static void action_reduce(const char *const *action)
492 {
493   CHECK_ACTION_PARAMS(action, 2, 2)
494   double comm_size = parse_double(action[2]);
495   double comp_size = parse_double(action[3]);
496   double clock = smpi_process_simulated_elapsed();
497   int root=0;
498   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
499
500   if(action[4]) {
501     root= atoi(action[4]);
502     if(action[5]) {
503       MPI_CURRENT_TYPE=decode_datatype(action[5]);
504     }
505   }
506
507   int rank = smpi_process_index();
508   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
509   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
510   extra->type = TRACING_REDUCE;
511   extra->send_size = comm_size;
512   extra->comp_size = comp_size;
513   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
514   extra->root = root_traced;
515
516   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
517
518   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
519   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
520   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
521   smpi_execute_flops(comp_size);
522
523   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
524   log_timed_action (action, clock);
525 }
526
527 static void action_allReduce(const char *const *action) {
528   CHECK_ACTION_PARAMS(action, 2, 1)
529   double comm_size = parse_double(action[2]);
530   double comp_size = parse_double(action[3]);
531
532   if(action[4])
533     MPI_CURRENT_TYPE=decode_datatype(action[4]);
534   else
535     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
536
537   double clock = smpi_process_simulated_elapsed();
538   int rank = smpi_process_index();
539   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
540   extra->type = TRACING_ALLREDUCE;
541   extra->send_size = comm_size;
542   extra->comp_size = comp_size;
543   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
544   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
545
546   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
547   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
548   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
549   smpi_execute_flops(comp_size);
550
551   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
552   log_timed_action (action, clock);
553 }
554
555 static void action_allToAll(const char *const *action) {
556   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
557                                      //two optional (corresponding datatypes)
558   double clock = smpi_process_simulated_elapsed();
559   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
560   int send_size = parse_double(action[2]);
561   int recv_size = parse_double(action[3]);
562   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
563
564   if(action[4] && action[5]) {
565     MPI_CURRENT_TYPE=decode_datatype(action[4]);
566     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
567   }
568   else{
569     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
570   }
571
572   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
573   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
574
575   int rank = smpi_process_index();
576   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
577   extra->type = TRACING_ALLTOALL;
578   extra->send_size = send_size;
579   extra->recv_size = recv_size;
580   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
581   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
582
583   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
584
585   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
586
587   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
588   log_timed_action (action, clock);
589 }
590
591 static void action_gather(const char *const *action) {
592   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
593         0 gather 68 68 0 0 0
594       where:
595         1) 68 is the sendcounts
596         2) 68 is the recvcounts
597         3) 0 is the root node
598         4) 0 is the send datatype id, see decode_datatype()
599         5) 0 is the recv datatype id, see decode_datatype()
600   */
601   CHECK_ACTION_PARAMS(action, 2, 3)
602   double clock = smpi_process_simulated_elapsed();
603   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
604   int send_size = parse_double(action[2]);
605   int recv_size = parse_double(action[3]);
606   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
607   if(action[4] && action[5]) {
608     MPI_CURRENT_TYPE=decode_datatype(action[5]);
609     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
610   } else {
611     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
612   }
613   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
614   void *recv = nullptr;
615   int root=0;
616   if(action[4])
617     root=atoi(action[4]);
618   int rank = smpi_comm_rank(MPI_COMM_WORLD);
619
620   if(rank==root)
621     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
622
623   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
624   extra->type = TRACING_GATHER;
625   extra->send_size = send_size;
626   extra->recv_size = recv_size;
627   extra->root = root;
628   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
629   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
630
631   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
632
633   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
634
635   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
636   log_timed_action (action, clock);
637 }
638
639 static void action_gatherv(const char *const *action) {
640   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
641        0 gather 68 68 10 10 10 0 0 0
642      where:
643        1) 68 is the sendcount
644        2) 68 10 10 10 is the recvcounts
645        3) 0 is the root node
646        4) 0 is the send datatype id, see decode_datatype()
647        5) 0 is the recv datatype id, see decode_datatype()
648   */
649
650   double clock = smpi_process_simulated_elapsed();
651   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
652   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
653   int send_size = parse_double(action[2]);
654   int disps[comm_size] = { 0 };
655   int recvcounts[comm_size];
656   int i=0,recv_sum=0;
657
658   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
659   if(action[4+comm_size] && action[5+comm_size]) {
660     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
661     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
662   } else {
663     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
664   }
665   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
666   void *recv = nullptr;
667   for(i=0;i<comm_size;i++) {
668     recvcounts[i] = atoi(action[i+3]);
669     recv_sum=recv_sum+recvcounts[i];
670   }
671
672   int root=atoi(action[3+comm_size]);
673   int rank = smpi_comm_rank(MPI_COMM_WORLD);
674
675   if(rank==root)
676     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
677
678   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
679   extra->type = TRACING_GATHERV;
680   extra->send_size = send_size;
681   extra->recvcounts= xbt_new(int,comm_size);
682   for(i=0; i< comm_size; i++)//copy data to avoid bad free
683     extra->recvcounts[i] = recvcounts[i];
684   extra->root = root;
685   extra->num_processes = comm_size;
686   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
687   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
688
689   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
690
691   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
692
693   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
694   log_timed_action (action, clock);
695 }
696
697 static void action_reducescatter(const char *const *action) {
698  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
699       0 reduceScatter 275427 275427 275427 204020 11346849 0
700     where:
701       1) The first four values after the name of the action declare the recvcounts array
702       2) The value 11346849 is the amount of instructions
703       3) The last value corresponds to the datatype, see decode_datatype().
704 */
705   double clock = smpi_process_simulated_elapsed();
706   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
707   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
708   int comp_size = parse_double(action[2+comm_size]);
709   int recvcounts[comm_size];
710   int rank = smpi_process_index();
711   int size = 0;
712   if(action[3+comm_size])
713     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
714   else
715     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
716
717   for(int i=0;i<comm_size;i++) {
718     recvcounts[i] = atoi(action[i+2]);
719     size+=recvcounts[i];
720   }
721
722   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
723   extra->type = TRACING_REDUCE_SCATTER;
724   extra->send_size = 0;
725   extra->recvcounts= xbt_new(int, comm_size);
726   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
727     extra->recvcounts[i] = recvcounts[i];
728   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
729   extra->comp_size = comp_size;
730   extra->num_processes = comm_size;
731
732   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
733
734   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
735   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
736
737   mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
738   smpi_execute_flops(comp_size);
739
740   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
741   log_timed_action (action, clock);
742 }
743
744 static void action_allgather(const char *const *action) {
745   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
746         0 allGather 275427 275427
747     where:
748         1) 275427 is the sendcount
749         2) 275427 is the recvcount
750         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
751   */
752   double clock = smpi_process_simulated_elapsed();
753
754   CHECK_ACTION_PARAMS(action, 2, 2)
755   int sendcount=atoi(action[2]); 
756   int recvcount=atoi(action[3]); 
757
758   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
759
760   if(action[4] && action[5]) {
761     MPI_CURRENT_TYPE = decode_datatype(action[4]);
762     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
763   } else {
764     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
765   }
766   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
767   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
768
769   int rank = smpi_process_index();
770   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
771   extra->type = TRACING_ALLGATHER;
772   extra->send_size = sendcount;
773   extra->recv_size= recvcount;
774   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
775   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
776   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
777
778   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
779
780   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
781
782   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
783   log_timed_action (action, clock);
784 }
785
786 static void action_allgatherv(const char *const *action) {
787   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
788         0 allGatherV 275427 275427 275427 275427 204020
789      where:
790         1) 275427 is the sendcount
791         2) The next four elements declare the recvcounts array
792         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
793   */
794   double clock = smpi_process_simulated_elapsed();
795
796   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
797   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
798   int sendcount=atoi(action[2]);
799   int recvcounts[comm_size];
800   int disps[comm_size] = { 0 };
801   int recv_sum=0;
802   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
803
804   if(action[3+comm_size] && action[4+comm_size]) {
805     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
806     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
807   } else {
808     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
809   }
810   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
811
812   for(int i=0;i<comm_size;i++) {
813     recvcounts[i] = atoi(action[i+3]);
814     recv_sum=recv_sum+recvcounts[i];
815   }
816   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
817
818   int rank = smpi_process_index();
819   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
820   extra->type = TRACING_ALLGATHERV;
821   extra->send_size = sendcount;
822   extra->recvcounts= xbt_new(int, comm_size);
823   for(int i=0; i< comm_size; i++)//copy data to avoid bad free
824     extra->recvcounts[i] = recvcounts[i];
825   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
826   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
827   extra->num_processes = comm_size;
828
829   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
830
831   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
832                           MPI_COMM_WORLD);
833
834   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
835   log_timed_action (action, clock);
836 }
837
838 static void action_allToAllv(const char *const *action) {
839   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
840         0 allToAllV 100 1 7 10 12 100 1 70 10 5
841      where:
842         1) 100 is the size of the send buffer *sizeof(int),
843         2) 1 7 10 12 is the sendcounts array
844         3) 100*sizeof(int) is the size of the receiver buffer
845         4)  1 70 10 5 is the recvcounts array
846   */
847   double clock = smpi_process_simulated_elapsed();
848
849   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
850   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
851   int sendcounts[comm_size];
852   int recvcounts[comm_size];
853   int senddisps[comm_size] = { 0 };
854   int recvdisps[comm_size] = { 0 };
855
856   MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
857
858   int send_buf_size=parse_double(action[2]);
859   int recv_buf_size=parse_double(action[3+comm_size]);
860   if(action[4+2*comm_size] && action[5+2*comm_size]) {
861     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
862     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
863   }
864   else{
865     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
866   }
867
868   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
869   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
870
871   for(int i=0;i<comm_size;i++) {
872     sendcounts[i] = atoi(action[i+3]);
873     recvcounts[i] = atoi(action[i+4+comm_size]);
874   }
875
876   int rank = smpi_process_index();
877   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
878   extra->type = TRACING_ALLTOALLV;
879   extra->recvcounts= xbt_new(int, comm_size);
880   extra->sendcounts= xbt_new(int, comm_size);
881   extra->num_processes = comm_size;
882
883   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
884     extra->send_size += sendcounts[i];
885     extra->sendcounts[i] = sendcounts[i];
886     extra->recv_size += recvcounts[i];
887     extra->recvcounts[i] = recvcounts[i];
888   }
889   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
890   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
891
892   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
893
894   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
895                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
896
897   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
898   log_timed_action (action, clock);
899 }
900
901 void smpi_replay_run(int *argc, char***argv){
902   /* First initializes everything */
903   smpi_process_init(argc, argv);
904   smpi_process_mark_as_initialized();
905   smpi_process_set_replaying(true);
906
907   int rank = smpi_process_index();
908   TRACE_smpi_init(rank);
909   TRACE_smpi_computing_init(rank);
910   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
911   extra->type = TRACING_INIT;
912   char *operation =bprintf("%s_init",__FUNCTION__);
913   TRACE_smpi_collective_in(rank, -1, operation, extra);
914   TRACE_smpi_collective_out(rank, -1, operation);
915   xbt_free(operation);
916
917   if (_xbt_replay_action_init()==0) {
918     xbt_replay_action_register("init",       action_init);
919     xbt_replay_action_register("finalize",   action_finalize);
920     xbt_replay_action_register("comm_size",  action_comm_size);
921     xbt_replay_action_register("comm_split", action_comm_split);
922     xbt_replay_action_register("comm_dup",   action_comm_dup);
923     xbt_replay_action_register("send",       action_send);
924     xbt_replay_action_register("Isend",      action_Isend);
925     xbt_replay_action_register("recv",       action_recv);
926     xbt_replay_action_register("Irecv",      action_Irecv);
927     xbt_replay_action_register("test",       action_test);
928     xbt_replay_action_register("wait",       action_wait);
929     xbt_replay_action_register("waitAll",    action_waitall);
930     xbt_replay_action_register("barrier",    action_barrier);
931     xbt_replay_action_register("bcast",      action_bcast);
932     xbt_replay_action_register("reduce",     action_reduce);
933     xbt_replay_action_register("allReduce",  action_allReduce);
934     xbt_replay_action_register("allToAll",   action_allToAll);
935     xbt_replay_action_register("allToAllV",  action_allToAllv);
936     xbt_replay_action_register("gather",  action_gather);
937     xbt_replay_action_register("gatherV",  action_gatherv);
938     xbt_replay_action_register("allGather",  action_allgather);
939     xbt_replay_action_register("allGatherV",  action_allgatherv);
940     xbt_replay_action_register("reduceScatter",  action_reducescatter);
941     xbt_replay_action_register("compute",    action_compute);
942   }
943
944   //if we have a delayed start, sleep here.
945   if(*argc>2){
946     char *endptr;
947     double value = strtod((*argv)[2], &endptr);
948     if (*endptr != '\0')
949       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
950     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
951     smpi_execute_flops(value);
952   } else {
953     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
954     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
955     smpi_execute_flops(0.0);
956   }
957
958   /* Actually run the replay */
959   xbt_replay_action_runner(*argc, *argv);
960
961   /* and now, finalize everything */
962   /* One active process will stop. Decrease the counter*/
963   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
964   if (!get_reqq_self()->empty()){
965     unsigned int count_requests=get_reqq_self()->size();
966     MPI_Request requests[count_requests];
967     MPI_Status status[count_requests];
968     unsigned int i=0;
969
970     for (auto req: *get_reqq_self()){
971       requests[i] = req;
972       i++;
973     }
974     smpi_mpi_waitall(count_requests, requests, status);
975   }
976   active_processes--;
977
978   if(active_processes==0){
979     /* Last process alive speaking: end the simulated timer */
980     XBT_INFO("Simulation time %f", smpi_process_simulated_elapsed());
981     _xbt_replay_action_exit();
982     xbt_free(sendbuffer);
983     xbt_free(recvbuffer);
984   }
985
986   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
987   extra_fin->type = TRACING_FINALIZE;
988   operation =bprintf("%s_finalize",__FUNCTION__);
989   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
990
991   smpi_process_finalize();
992
993   TRACE_smpi_collective_out(rank, -1, operation);
994   TRACE_smpi_finalize(smpi_process_index());
995   smpi_process_destroy();
996   xbt_free(operation);
997 }