Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
reindent and cosmetics
[simgrid.git] / src / smpi / smpi_replay.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include <stdio.h>
9 #include <xbt.h>
10 #include <xbt/replay.h>
11 #include <unordered_map>
12 #include <vector>
13
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
17
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
21
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
24
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
29
30 static void log_timed_action (const char *const *action, double clock){
31   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32     char *name = xbt_str_join_array(action, " ");
33     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
34     xbt_free(name);
35   }
36 }
37
38 static std::vector<MPI_Request>* get_reqq_self()
39 {
40   return reqq.at(smpi_process_index());
41 }
42
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
44 {
45    reqq.insert({smpi_process_index(), mpi_request});
46 }
47
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
50 {
51   if (!smpi_process_get_replaying())
52     return xbt_malloc(size);
53   if (sendbuffer_size<size){
54     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
55     sendbuffer_size=size;
56   }
57   return sendbuffer;
58 }
59
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62   if (!smpi_process_get_replaying())
63   return xbt_malloc(size);
64   if (recvbuffer_size<size){
65     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
66     recvbuffer_size=size;
67   }
68   return recvbuffer;
69 }
70
71 void smpi_free_tmp_buffer(void* buf){
72   if (!smpi_process_get_replaying())
73     xbt_free(buf);
74 }
75
76 /* Helper function */
77 static double parse_double(const char *string)
78 {
79   double value;
80   char *endptr;
81   value = strtod(string, &endptr);
82   if (*endptr != '\0')
83     THROWF(unknown_error, 0, "%s is not a double", string);
84   return value;
85 }
86
87 static MPI_Datatype decode_datatype(const char *const action)
88 {
89 // Declared datatypes,
90   switch(atoi(action)) {
91     case 0:
92       MPI_CURRENT_TYPE=MPI_DOUBLE;
93       break;
94     case 1:
95       MPI_CURRENT_TYPE=MPI_INT;
96       break;
97     case 2:
98       MPI_CURRENT_TYPE=MPI_CHAR;
99       break;
100     case 3:
101       MPI_CURRENT_TYPE=MPI_SHORT;
102       break;
103     case 4:
104       MPI_CURRENT_TYPE=MPI_LONG;
105       break;
106     case 5:
107       MPI_CURRENT_TYPE=MPI_FLOAT;
108       break;
109     case 6:
110       MPI_CURRENT_TYPE=MPI_BYTE;
111       break;
112     default:
113       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
114   }
115    return MPI_CURRENT_TYPE;
116 }
117
118
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
120 {
121   //default type for output is set to MPI_BYTE
122   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
123   if(known!=nullptr)
124     *known=1;
125   if (datatype==MPI_BYTE){
126       return "";
127   }
128   if(datatype==MPI_DOUBLE)
129       return "0";
130   if(datatype==MPI_INT)
131       return "1";
132   if(datatype==MPI_CHAR)
133       return "2";
134   if(datatype==MPI_SHORT)
135       return "3";
136   if(datatype==MPI_LONG)
137     return "4";
138   if(datatype==MPI_FLOAT)
139       return "5";
140   //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
141   if(known!=nullptr)
142     *known=0;
143   // default - not implemented.
144   // do not warn here as we pass in this function even for other trace formats
145   return "-1";
146 }
147
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
149     int i=0;\
150     while(action[i]!=nullptr)\
151      i++;\
152     if(i<mandatory+2)                                           \
153     THROWF(arg_error, 0, "%s replay failed.\n" \
154           "%d items were given on the line. First two should be process_id and action.  " \
155           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157   }
158
159 static void action_init(const char *const *action)
160 {
161   XBT_DEBUG("Initialize the counters");
162   CHECK_ACTION_PARAMS(action, 0, 1)
163   if(action[2]) 
164     MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype 
165   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
166
167   /* start a simulated timer */
168   smpi_process_simulated_start();
169   /*initialize the number of active processes */
170   active_processes = smpi_process_count();
171
172   set_reqq_self(new std::vector<MPI_Request>);
173 }
174
175 static void action_finalize(const char *const *action)
176 {
177   /* do nothing */
178 }
179
180 static void action_comm_size(const char *const *action)
181 {
182   double clock = smpi_process_simulated_elapsed();
183
184   communicator_size = parse_double(action[2]);
185   log_timed_action (action, clock);
186 }
187
188 static void action_comm_split(const char *const *action)
189 {
190   double clock = smpi_process_simulated_elapsed();
191
192   log_timed_action (action, clock);
193 }
194
195 static void action_comm_dup(const char *const *action)
196 {
197   double clock = smpi_process_simulated_elapsed();
198
199   log_timed_action (action, clock);
200 }
201
202 static void action_compute(const char *const *action)
203 {
204   CHECK_ACTION_PARAMS(action, 1, 0)
205   double clock = smpi_process_simulated_elapsed();
206   double flops= parse_double(action[2]);
207   int rank = smpi_process_index();
208   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209   extra->type=TRACING_COMPUTING;
210   extra->comp_size=flops;
211   TRACE_smpi_computing_in(rank, extra);
212
213   smpi_execute_flops(flops);
214
215   TRACE_smpi_computing_out(rank);
216   log_timed_action (action, clock);
217 }
218
219 static void action_send(const char *const *action)
220 {
221   CHECK_ACTION_PARAMS(action, 2, 1)
222   int to = atoi(action[2]);
223   double size=parse_double(action[3]);
224   double clock = smpi_process_simulated_elapsed();
225
226   if(action[4])
227     MPI_CURRENT_TYPE=decode_datatype(action[4]);
228   else
229     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
230
231   int rank = smpi_process_index();
232
233   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235   extra->type = TRACING_SEND;
236   extra->send_size = size;
237   extra->src = rank;
238   extra->dst = dst_traced;
239   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241   if (!TRACE_smpi_view_internals()) {
242     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
243   }
244
245   smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
246
247   log_timed_action (action, clock);
248
249   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
250 }
251
252 static void action_Isend(const char *const *action)
253 {
254   CHECK_ACTION_PARAMS(action, 2, 1)
255   int to = atoi(action[2]);
256   double size=parse_double(action[3]);
257   double clock = smpi_process_simulated_elapsed();
258   MPI_Request request;
259
260   if(action[4]) 
261     MPI_CURRENT_TYPE=decode_datatype(action[4]);
262   else 
263     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
264
265   int rank = smpi_process_index();
266   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268   extra->type = TRACING_ISEND;
269   extra->send_size = size;
270   extra->src = rank;
271   extra->dst = dst_traced;
272   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274   if (!TRACE_smpi_view_internals()) {
275     TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
276   }
277
278   request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
279
280   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
281   request->send = 1;
282
283   get_reqq_self()->push_back(request);
284
285   log_timed_action (action, clock);
286 }
287
288 static void action_recv(const char *const *action) {
289   CHECK_ACTION_PARAMS(action, 2, 1)
290   int from = atoi(action[2]);
291   double size=parse_double(action[3]);
292   double clock = smpi_process_simulated_elapsed();
293   MPI_Status status;
294
295   if(action[4]) 
296     MPI_CURRENT_TYPE=decode_datatype(action[4]);
297   else 
298     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
299
300   int rank = smpi_process_index();
301   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
302
303   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304   extra->type = TRACING_RECV;
305   extra->send_size = size;
306   extra->src = src_traced;
307   extra->dst = rank;
308   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
310
311   //unknown size from the receiver point of view
312   if(size<=0.0){
313     smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
314     size=status.count;
315   }
316
317   smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
318
319   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320   if (!TRACE_smpi_view_internals()) {
321     TRACE_smpi_recv(rank, src_traced, rank);
322   }
323
324   log_timed_action (action, clock);
325 }
326
327 static void action_Irecv(const char *const *action)
328 {
329   CHECK_ACTION_PARAMS(action, 2, 1)
330   int from = atoi(action[2]);
331   double size=parse_double(action[3]);
332   double clock = smpi_process_simulated_elapsed();
333   MPI_Request request;
334
335   if(action[4]) 
336     MPI_CURRENT_TYPE=decode_datatype(action[4]);
337   else 
338     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
339
340   int rank = smpi_process_index();
341   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343   extra->type = TRACING_IRECV;
344   extra->send_size = size;
345   extra->src = src_traced;
346   extra->dst = rank;
347   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
349   MPI_Status status;
350   //unknow size from the receiver pov
351   if(size<=0.0){
352       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
353       size=status.count;
354   }
355
356   request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
357
358   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
359   request->recv = 1;
360   get_reqq_self()->push_back(request);
361
362   log_timed_action (action, clock);
363 }
364
365 static void action_test(const char *const *action){
366   CHECK_ACTION_PARAMS(action, 0, 0)
367   double clock = smpi_process_simulated_elapsed();
368   MPI_Status status;
369   int flag = true;
370
371   MPI_Request request = get_reqq_self()->back();
372   get_reqq_self()->pop_back();
373   //if request is null here, this may mean that a previous test has succeeded 
374   //Different times in traced application and replayed version may lead to this 
375   //In this case, ignore the extra calls.
376   if(request!=nullptr){
377     int rank = smpi_process_index();
378     instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379     extra->type=TRACING_TEST;
380     TRACE_smpi_testing_in(rank, extra);
381
382     flag = smpi_mpi_test(&request, &status);
383
384     XBT_DEBUG("MPI_Test result: %d", flag);
385     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386     get_reqq_self()->push_back(request);
387
388     TRACE_smpi_testing_out(rank);
389   }
390   log_timed_action (action, clock);
391 }
392
393 static void action_wait(const char *const *action){
394   CHECK_ACTION_PARAMS(action, 0, 0)
395   double clock = smpi_process_simulated_elapsed();
396   MPI_Status status;
397
398   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399       xbt_str_join_array(action," "));
400   MPI_Request request = get_reqq_self()->back();
401   get_reqq_self()->pop_back();
402
403   if (request==nullptr){
404     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
405     return;
406   }
407
408   int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
409
410   MPI_Group group = smpi_comm_group(request->comm);
411   int src_traced = smpi_group_rank(group, request->src);
412   int dst_traced = smpi_group_rank(group, request->dst);
413   int is_wait_for_receive = request->recv;
414   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415   extra->type = TRACING_WAIT;
416   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
417
418   smpi_mpi_wait(&request, &status);
419
420   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421   if (is_wait_for_receive)
422     TRACE_smpi_recv(rank, src_traced, dst_traced);
423   log_timed_action (action, clock);
424 }
425
426 static void action_waitall(const char *const *action){
427   CHECK_ACTION_PARAMS(action, 0, 0)
428   double clock = smpi_process_simulated_elapsed();
429   unsigned int count_requests=get_reqq_self()->size();
430
431   if (count_requests>0) {
432     MPI_Status status[count_requests];
433
434    int rank_traced = smpi_process_index();
435    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436    extra->type = TRACING_WAITALL;
437    extra->send_size=count_requests;
438    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
439
440    smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
441
442    for (auto req : *(get_reqq_self())){
443      if (req && req->recv)
444        TRACE_smpi_recv(rank_traced, req->src, req->dst);
445    }
446    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
447   }
448   log_timed_action (action, clock);
449 }
450
451 static void action_barrier(const char *const *action){
452   double clock = smpi_process_simulated_elapsed();
453   int rank = smpi_process_index();
454   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455   extra->type = TRACING_BARRIER;
456   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
457
458   mpi_coll_barrier_fun(MPI_COMM_WORLD);
459
460   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
461   log_timed_action (action, clock);
462 }
463
464 static void action_bcast(const char *const *action)
465 {
466   CHECK_ACTION_PARAMS(action, 1, 2)
467   double size = parse_double(action[2]);
468   double clock = smpi_process_simulated_elapsed();
469   int root=0;
470   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
472
473   if(action[3]) {
474     root= atoi(action[3]);
475     if(action[4]) {
476       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
477     }
478   }
479
480   int rank = smpi_process_index();
481   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
482
483   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
484   extra->type = TRACING_BCAST;
485   extra->send_size = size;
486   extra->root = root_traced;
487   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
488   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
489   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
490
491   mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
492
493   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
494   log_timed_action (action, clock);
495 }
496
497 static void action_reduce(const char *const *action)
498 {
499   CHECK_ACTION_PARAMS(action, 2, 2)
500   double comm_size = parse_double(action[2]);
501   double comp_size = parse_double(action[3]);
502   double clock = smpi_process_simulated_elapsed();
503   int root=0;
504   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
505
506   if(action[4]) {
507     root= atoi(action[4]);
508     if(action[5]) {
509       MPI_CURRENT_TYPE=decode_datatype(action[5]);
510     }
511   }
512
513   int rank = smpi_process_index();
514   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
515   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
516   extra->type = TRACING_REDUCE;
517   extra->send_size = comm_size;
518   extra->comp_size = comp_size;
519   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
520   extra->root = root_traced;
521
522   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
523
524   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
525   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
526   mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
527   smpi_execute_flops(comp_size);
528
529   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
530   log_timed_action (action, clock);
531 }
532
533 static void action_allReduce(const char *const *action) {
534   CHECK_ACTION_PARAMS(action, 2, 1)
535   double comm_size = parse_double(action[2]);
536   double comp_size = parse_double(action[3]);
537
538   if(action[4])
539     MPI_CURRENT_TYPE=decode_datatype(action[4]);
540   else
541     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
542
543   double clock = smpi_process_simulated_elapsed();
544   int rank = smpi_process_index();
545   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
546   extra->type = TRACING_ALLREDUCE;
547   extra->send_size = comm_size;
548   extra->comp_size = comp_size;
549   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
550   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
551
552   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
553   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
554   mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
555   smpi_execute_flops(comp_size);
556
557   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
558   log_timed_action (action, clock);
559 }
560
561 static void action_allToAll(const char *const *action) {
562   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
563                                      //two optional (corresponding datatypes)
564   double clock = smpi_process_simulated_elapsed();
565   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
566   int send_size = parse_double(action[2]);
567   int recv_size = parse_double(action[3]);
568   MPI_Datatype MPI_CURRENT_TYPE2;
569
570   if(action[4] && action[5]) {
571     MPI_CURRENT_TYPE=decode_datatype(action[4]);
572     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
573   }
574   else{
575     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
576     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
577   }
578
579   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
580   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
581
582   int rank = smpi_process_index();
583   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
584   extra->type = TRACING_ALLTOALL;
585   extra->send_size = send_size;
586   extra->recv_size = recv_size;
587   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
588   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
589
590   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
591
592   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
593
594   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
595   log_timed_action (action, clock);
596 }
597
598 static void action_gather(const char *const *action) {
599   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
600  0 gather 68 68 0 0 0
601
602   where: 
603   1) 68 is the sendcounts
604   2) 68 is the recvcounts
605   3) 0 is the root node
606   4) 0 is the send datatype id, see decode_datatype()
607   5) 0 is the recv datatype id, see decode_datatype()
608   */
609   CHECK_ACTION_PARAMS(action, 2, 3)
610   double clock = smpi_process_simulated_elapsed();
611   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
612   int send_size = parse_double(action[2]);
613   int recv_size = parse_double(action[3]);
614   MPI_Datatype MPI_CURRENT_TYPE2;
615   if(action[4] && action[5]) {
616     MPI_CURRENT_TYPE=decode_datatype(action[5]);
617     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
618   } else {
619     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
621   }
622   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
623   void *recv = nullptr;
624   int root=0;
625   if(action[4])
626     root=atoi(action[4]);
627   int rank = smpi_comm_rank(MPI_COMM_WORLD);
628
629   if(rank==root)
630     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
631
632   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
633   extra->type = TRACING_GATHER;
634   extra->send_size = send_size;
635   extra->recv_size = recv_size;
636   extra->root = root;
637   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
638   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
639
640   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
641
642   mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
643
644   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
645   log_timed_action (action, clock);
646 }
647
648 static void action_gatherv(const char *const *action) {
649   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
650  0 gather 68 68 10 10 10 0 0 0
651
652   where:
653   1) 68 is the sendcount
654   2) 68 10 10 10 is the recvcounts
655   3) 0 is the root node
656   4) 0 is the send datatype id, see decode_datatype()
657   5) 0 is the recv datatype id, see decode_datatype()
658   */
659
660   double clock = smpi_process_simulated_elapsed();
661   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
662   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
663   int send_size = parse_double(action[2]);
664   int *disps = xbt_new0(int, comm_size);
665   int *recvcounts = xbt_new0(int, comm_size);
666   int i=0,recv_sum=0;
667
668   MPI_Datatype MPI_CURRENT_TYPE2;
669   if(action[4+comm_size] && action[5+comm_size]) {
670     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
671     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
672   } else {
673     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
674     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
675   }
676   void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
677   void *recv = nullptr;
678   for(i=0;i<comm_size;i++) {
679     recvcounts[i] = atoi(action[i+3]);
680     recv_sum=recv_sum+recvcounts[i];
681     disps[i] = 0;
682   }
683
684   int root=atoi(action[3+comm_size]);
685   int rank = smpi_comm_rank(MPI_COMM_WORLD);
686
687   if(rank==root)
688     recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
689
690   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691   extra->type = TRACING_GATHERV;
692   extra->send_size = send_size;
693   extra->recvcounts= xbt_new(int,comm_size);
694   for(i=0; i< comm_size; i++)//copy data to avoid bad free
695     extra->recvcounts[i] = recvcounts[i];
696   extra->root = root;
697   extra->num_processes = comm_size;
698   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
699   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
700
701   TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
702
703   smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
704
705   TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
706   log_timed_action (action, clock);
707   xbt_free(recvcounts);
708   xbt_free(disps);
709 }
710
711 static void action_reducescatter(const char *const *action) {
712  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
713 0 reduceScatter 275427 275427 275427 204020 11346849 0
714
715   where: 
716   1) The first four values after the name of the action declare the recvcounts array
717   2) The value 11346849 is the amount of instructions
718   3) The last value corresponds to the datatype, see decode_datatype().
719
720   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
721   double clock = smpi_process_simulated_elapsed();
722   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
723   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
724   int comp_size = parse_double(action[2+comm_size]);
725   int *recvcounts = xbt_new0(int, comm_size);  
726   int *disps = xbt_new0(int, comm_size);  
727   int i=0;
728   int rank = smpi_process_index();
729   int size = 0;
730   if(action[3+comm_size])
731     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
732   else
733     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
734
735   for(i=0;i<comm_size;i++) {
736     recvcounts[i] = atoi(action[i+2]);
737     disps[i] = 0;
738     size+=recvcounts[i];
739   }
740
741   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
742   extra->type = TRACING_REDUCE_SCATTER;
743   extra->send_size = 0;
744   extra->recvcounts= xbt_new(int, comm_size);
745   for(i=0; i< comm_size; i++)//copy data to avoid bad free
746     extra->recvcounts[i] = recvcounts[i];
747   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
748   extra->comp_size = comp_size;
749   extra->num_processes = comm_size;
750
751   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
752
753   void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
754   void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
755    
756    mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
757    smpi_execute_flops(comp_size);
758
759   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
760   xbt_free(recvcounts);
761   xbt_free(disps);
762   log_timed_action (action, clock);
763 }
764
765 static void action_allgather(const char *const *action) {
766   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
767   0 allGather 275427 275427
768
769   where: 
770   1) 275427 is the sendcount
771   2) 275427 is the recvcount
772   3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().  */
773   double clock = smpi_process_simulated_elapsed();
774
775   CHECK_ACTION_PARAMS(action, 2, 2)
776   int sendcount=atoi(action[2]); 
777   int recvcount=atoi(action[3]); 
778
779   MPI_Datatype MPI_CURRENT_TYPE2;
780
781   if(action[4] && action[5]) {
782     MPI_CURRENT_TYPE = decode_datatype(action[4]);
783     MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
784   } else {
785     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
786     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
787   }
788   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
789   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
790
791   int rank = smpi_process_index();
792   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
793   extra->type = TRACING_ALLGATHER;
794   extra->send_size = sendcount;
795   extra->recv_size= recvcount;
796   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
797   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
798   extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
799
800   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
801
802   mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
803
804   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
805   log_timed_action (action, clock);
806 }
807
808 static void action_allgatherv(const char *const *action) {
809   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
810 0 allGatherV 275427 275427 275427 275427 204020
811
812   where: 
813   1) 275427 is the sendcount
814   2) The next four elements declare the recvcounts array
815   3) No more values mean that the datatype for sent and receive buffer
816   is the default one, see decode_datatype(). */
817   double clock = smpi_process_simulated_elapsed();
818
819   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
820   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
821   int i=0;
822   int sendcount=atoi(action[2]);
823   int *recvcounts = xbt_new0(int, comm_size);  
824   int *disps = xbt_new0(int, comm_size);  
825   int recv_sum=0;  
826   MPI_Datatype MPI_CURRENT_TYPE2;
827
828   if(action[3+comm_size] && action[4+comm_size]) {
829     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
830     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
831   } else {
832     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
833     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
834   }
835   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
836
837   for(i=0;i<comm_size;i++) {
838     recvcounts[i] = atoi(action[i+3]);
839     recv_sum=recv_sum+recvcounts[i];
840   }
841   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
842
843   int rank = smpi_process_index();
844   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
845   extra->type = TRACING_ALLGATHERV;
846   extra->send_size = sendcount;
847   extra->recvcounts= xbt_new(int, comm_size);
848   for(i=0; i< comm_size; i++)//copy data to avoid bad free
849     extra->recvcounts[i] = recvcounts[i];
850   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
851   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
852   extra->num_processes = comm_size;
853
854   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
855
856   mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
857                           MPI_COMM_WORLD);
858
859   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
860   log_timed_action (action, clock);
861   xbt_free(recvcounts);
862   xbt_free(disps);
863 }
864
865 static void action_allToAllv(const char *const *action) {
866   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
867   0 allToAllV 100 1 7 10 12 100 1 70 10 5
868
869   where: 
870   1) 100 is the size of the send buffer *sizeof(int),
871   2) 1 7 10 12 is the sendcounts array
872   3) 100*sizeof(int) is the size of the receiver buffer
873   4)  1 70 10 5 is the recvcounts array */
874   double clock = smpi_process_simulated_elapsed();
875
876   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
877   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
878   int *sendcounts = xbt_new0(int, comm_size);  
879   int *recvcounts = xbt_new0(int, comm_size);  
880   int *senddisps = xbt_new0(int, comm_size);  
881   int *recvdisps = xbt_new0(int, comm_size);  
882
883   MPI_Datatype MPI_CURRENT_TYPE2;
884
885   int send_buf_size=parse_double(action[2]);
886   int recv_buf_size=parse_double(action[3+comm_size]);
887   if(action[4+2*comm_size] && action[5+2*comm_size]) {
888     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
889     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
890   }
891   else{
892     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
893     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
894   }
895
896   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
897   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
898
899   for(int i=0;i<comm_size;i++) {
900     sendcounts[i] = atoi(action[i+3]);
901     recvcounts[i] = atoi(action[i+4+comm_size]);
902   }
903
904   int rank = smpi_process_index();
905   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
906   extra->type = TRACING_ALLTOALLV;
907   extra->recvcounts= xbt_new(int, comm_size);
908   extra->sendcounts= xbt_new(int, comm_size);
909   extra->num_processes = comm_size;
910
911   for(int i=0; i< comm_size; i++){//copy data to avoid bad free
912     extra->send_size += sendcounts[i];
913     extra->sendcounts[i] = sendcounts[i];
914     extra->recv_size += recvcounts[i];
915     extra->recvcounts[i] = recvcounts[i];
916   }
917   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
918   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
919
920   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
921
922   mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
923                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
924
925   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
926   log_timed_action (action, clock);
927   xbt_free(sendcounts);
928   xbt_free(recvcounts);
929   xbt_free(senddisps);
930   xbt_free(recvdisps);
931 }
932
933 void smpi_replay_run(int *argc, char***argv){
934   /* First initializes everything */
935   smpi_process_init(argc, argv);
936   smpi_process_mark_as_initialized();
937   smpi_process_set_replaying(true);
938
939   int rank = smpi_process_index();
940   TRACE_smpi_init(rank);
941   TRACE_smpi_computing_init(rank);
942   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
943   extra->type = TRACING_INIT;
944   char *operation =bprintf("%s_init",__FUNCTION__);
945   TRACE_smpi_collective_in(rank, -1, operation, extra);
946   TRACE_smpi_collective_out(rank, -1, operation);
947   xbt_free(operation);
948
949   if (_xbt_replay_action_init()==0) {
950     xbt_replay_action_register("init",       action_init);
951     xbt_replay_action_register("finalize",   action_finalize);
952     xbt_replay_action_register("comm_size",  action_comm_size);
953     xbt_replay_action_register("comm_split", action_comm_split);
954     xbt_replay_action_register("comm_dup",   action_comm_dup);
955     xbt_replay_action_register("send",       action_send);
956     xbt_replay_action_register("Isend",      action_Isend);
957     xbt_replay_action_register("recv",       action_recv);
958     xbt_replay_action_register("Irecv",      action_Irecv);
959     xbt_replay_action_register("test",       action_test);
960     xbt_replay_action_register("wait",       action_wait);
961     xbt_replay_action_register("waitAll",    action_waitall);
962     xbt_replay_action_register("barrier",    action_barrier);
963     xbt_replay_action_register("bcast",      action_bcast);
964     xbt_replay_action_register("reduce",     action_reduce);
965     xbt_replay_action_register("allReduce",  action_allReduce);
966     xbt_replay_action_register("allToAll",   action_allToAll);
967     xbt_replay_action_register("allToAllV",  action_allToAllv);
968     xbt_replay_action_register("gather",  action_gather);
969     xbt_replay_action_register("gatherV",  action_gatherv);
970     xbt_replay_action_register("allGather",  action_allgather);
971     xbt_replay_action_register("allGatherV",  action_allgatherv);
972     xbt_replay_action_register("reduceScatter",  action_reducescatter);
973     xbt_replay_action_register("compute",    action_compute);
974   }
975
976   //if we have a delayed start, sleep here.
977   if(*argc>2){
978     char *endptr;
979     double value = strtod((*argv)[2], &endptr);
980     if (*endptr != '\0')
981       THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
982     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
983     smpi_execute_flops(value);
984   } else {
985     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
986     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
987     smpi_execute_flops(0.0);
988   }
989
990   /* Actually run the replay */
991   xbt_replay_action_runner(*argc, *argv);
992
993   /* and now, finalize everything */
994   double sim_time= 1.;
995   /* One active process will stop. Decrease the counter*/
996   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
997   if (!get_reqq_self()->empty()){
998     unsigned int count_requests=get_reqq_self()->size();
999     MPI_Request requests[count_requests];
1000     MPI_Status status[count_requests];
1001     unsigned int i=0;
1002
1003     for (auto req: *get_reqq_self()){
1004       requests[i] = req;
1005       i++;
1006     }
1007     smpi_mpi_waitall(count_requests, requests, status);
1008     active_processes--;
1009   } else {
1010     active_processes--;
1011   }
1012
1013   if(active_processes==0){
1014     /* Last process alive speaking */
1015     /* end the simulated timer */
1016     sim_time = smpi_process_simulated_elapsed();
1017     XBT_INFO("Simulation time %f", sim_time);
1018     _xbt_replay_action_exit();
1019     xbt_free(sendbuffer);
1020     xbt_free(recvbuffer);
1021   }
1022
1023   instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1024   extra_fin->type = TRACING_FINALIZE;
1025   operation =bprintf("%s_finalize",__FUNCTION__);
1026   TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1027
1028   smpi_process_finalize();
1029
1030   TRACE_smpi_collective_out(rank, -1, operation);
1031   TRACE_smpi_finalize(smpi_process_index());
1032   smpi_process_destroy();
1033   xbt_free(operation);
1034 }