1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
11 #include <unordered_map>
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
30 static void log_timed_action (const char *const *action, double clock){
31 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32 char *name = xbt_str_join_array(action, " ");
33 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
38 static std::vector<MPI_Request>* get_reqq_self()
40 return reqq.at(smpi_process_index());
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
45 reqq.insert({smpi_process_index(), mpi_request});
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
51 if (!smpi_process_get_replaying())
52 return xbt_malloc(size);
53 if (sendbuffer_size<size){
54 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (recvbuffer_size<size){
65 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71 void smpi_free_tmp_buffer(void* buf){
72 if (!smpi_process_get_replaying())
77 static double parse_double(const char *string)
81 value = strtod(string, &endptr);
83 THROWF(unknown_error, 0, "%s is not a double", string);
87 static MPI_Datatype decode_datatype(const char *const action)
89 // Declared datatypes,
90 switch(atoi(action)) {
92 MPI_CURRENT_TYPE=MPI_DOUBLE;
95 MPI_CURRENT_TYPE=MPI_INT;
98 MPI_CURRENT_TYPE=MPI_CHAR;
101 MPI_CURRENT_TYPE=MPI_SHORT;
104 MPI_CURRENT_TYPE=MPI_LONG;
107 MPI_CURRENT_TYPE=MPI_FLOAT;
110 MPI_CURRENT_TYPE=MPI_BYTE;
113 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
115 return MPI_CURRENT_TYPE;
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
121 //default type for output is set to MPI_BYTE
122 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125 if (datatype==MPI_BYTE){
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
143 // default - not implemented.
144 // do not warn here as we pass in this function even for other trace formats
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150 while(action[i]!=nullptr)\
153 THROWF(arg_error, 0, "%s replay failed.\n" \
154 "%d items were given on the line. First two should be process_id and action. " \
155 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
165 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process_simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, clock);
188 static void action_comm_split(const char *const *action)
190 double clock = smpi_process_simulated_elapsed();
192 log_timed_action (action, clock);
195 static void action_comm_dup(const char *const *action)
197 double clock = smpi_process_simulated_elapsed();
199 log_timed_action (action, clock);
202 static void action_compute(const char *const *action)
204 CHECK_ACTION_PARAMS(action, 1, 0)
205 double clock = smpi_process_simulated_elapsed();
206 double flops= parse_double(action[2]);
207 int rank = smpi_process_index();
208 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209 extra->type=TRACING_COMPUTING;
210 extra->comp_size=flops;
211 TRACE_smpi_computing_in(rank, extra);
213 smpi_execute_flops(flops);
215 TRACE_smpi_computing_out(rank);
216 log_timed_action (action, clock);
219 static void action_send(const char *const *action)
221 CHECK_ACTION_PARAMS(action, 2, 1)
222 int to = atoi(action[2]);
223 double size=parse_double(action[3]);
224 double clock = smpi_process_simulated_elapsed();
227 MPI_CURRENT_TYPE=decode_datatype(action[4]);
229 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231 int rank = smpi_process_index();
233 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235 extra->type = TRACING_SEND;
236 extra->send_size = size;
238 extra->dst = dst_traced;
239 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241 if (!TRACE_smpi_view_internals()) {
242 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
245 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
247 log_timed_action (action, clock);
249 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
252 static void action_Isend(const char *const *action)
254 CHECK_ACTION_PARAMS(action, 2, 1)
255 int to = atoi(action[2]);
256 double size=parse_double(action[3]);
257 double clock = smpi_process_simulated_elapsed();
261 MPI_CURRENT_TYPE=decode_datatype(action[4]);
263 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265 int rank = smpi_process_index();
266 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268 extra->type = TRACING_ISEND;
269 extra->send_size = size;
271 extra->dst = dst_traced;
272 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274 if (!TRACE_smpi_view_internals()) {
275 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
278 request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
280 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
283 get_reqq_self()->push_back(request);
285 log_timed_action (action, clock);
288 static void action_recv(const char *const *action) {
289 CHECK_ACTION_PARAMS(action, 2, 1)
290 int from = atoi(action[2]);
291 double size=parse_double(action[3]);
292 double clock = smpi_process_simulated_elapsed();
296 MPI_CURRENT_TYPE=decode_datatype(action[4]);
298 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
300 int rank = smpi_process_index();
301 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
303 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304 extra->type = TRACING_RECV;
305 extra->send_size = size;
306 extra->src = src_traced;
308 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
311 //unknown size from the receiver point of view
313 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
317 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
319 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320 if (!TRACE_smpi_view_internals()) {
321 TRACE_smpi_recv(rank, src_traced, rank);
324 log_timed_action (action, clock);
327 static void action_Irecv(const char *const *action)
329 CHECK_ACTION_PARAMS(action, 2, 1)
330 int from = atoi(action[2]);
331 double size=parse_double(action[3]);
332 double clock = smpi_process_simulated_elapsed();
336 MPI_CURRENT_TYPE=decode_datatype(action[4]);
338 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
340 int rank = smpi_process_index();
341 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343 extra->type = TRACING_IRECV;
344 extra->send_size = size;
345 extra->src = src_traced;
347 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
350 //unknow size from the receiver pov
352 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
356 request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
358 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
360 get_reqq_self()->push_back(request);
362 log_timed_action (action, clock);
365 static void action_test(const char *const *action){
366 CHECK_ACTION_PARAMS(action, 0, 0)
367 double clock = smpi_process_simulated_elapsed();
371 MPI_Request request = get_reqq_self()->back();
372 get_reqq_self()->pop_back();
373 //if request is null here, this may mean that a previous test has succeeded
374 //Different times in traced application and replayed version may lead to this
375 //In this case, ignore the extra calls.
376 if(request!=nullptr){
377 int rank = smpi_process_index();
378 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379 extra->type=TRACING_TEST;
380 TRACE_smpi_testing_in(rank, extra);
382 flag = smpi_mpi_test(&request, &status);
384 XBT_DEBUG("MPI_Test result: %d", flag);
385 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386 get_reqq_self()->push_back(request);
388 TRACE_smpi_testing_out(rank);
390 log_timed_action (action, clock);
393 static void action_wait(const char *const *action){
394 CHECK_ACTION_PARAMS(action, 0, 0)
395 double clock = smpi_process_simulated_elapsed();
398 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399 xbt_str_join_array(action," "));
400 MPI_Request request = get_reqq_self()->back();
401 get_reqq_self()->pop_back();
403 if (request==nullptr){
404 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
408 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
410 MPI_Group group = smpi_comm_group(request->comm);
411 int src_traced = smpi_group_rank(group, request->src);
412 int dst_traced = smpi_group_rank(group, request->dst);
413 int is_wait_for_receive = request->recv;
414 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415 extra->type = TRACING_WAIT;
416 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
418 smpi_mpi_wait(&request, &status);
420 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421 if (is_wait_for_receive)
422 TRACE_smpi_recv(rank, src_traced, dst_traced);
423 log_timed_action (action, clock);
426 static void action_waitall(const char *const *action){
427 CHECK_ACTION_PARAMS(action, 0, 0)
428 double clock = smpi_process_simulated_elapsed();
429 unsigned int count_requests=get_reqq_self()->size();
431 if (count_requests>0) {
432 MPI_Status status[count_requests];
434 int rank_traced = smpi_process_index();
435 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436 extra->type = TRACING_WAITALL;
437 extra->send_size=count_requests;
438 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
440 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
442 for (auto req : *(get_reqq_self())){
443 if (req && req->recv)
444 TRACE_smpi_recv(rank_traced, req->src, req->dst);
446 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
448 log_timed_action (action, clock);
451 static void action_barrier(const char *const *action){
452 double clock = smpi_process_simulated_elapsed();
453 int rank = smpi_process_index();
454 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455 extra->type = TRACING_BARRIER;
456 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
458 mpi_coll_barrier_fun(MPI_COMM_WORLD);
460 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
461 log_timed_action (action, clock);
464 static void action_bcast(const char *const *action)
466 CHECK_ACTION_PARAMS(action, 1, 2)
467 double size = parse_double(action[2]);
468 double clock = smpi_process_simulated_elapsed();
470 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
474 root= atoi(action[3]);
476 MPI_CURRENT_TYPE=decode_datatype(action[4]);
480 int rank = smpi_process_index();
481 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
483 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
484 extra->type = TRACING_BCAST;
485 extra->send_size = size;
486 extra->root = root_traced;
487 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
488 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
489 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
491 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
493 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
494 log_timed_action (action, clock);
497 static void action_reduce(const char *const *action)
499 CHECK_ACTION_PARAMS(action, 2, 2)
500 double comm_size = parse_double(action[2]);
501 double comp_size = parse_double(action[3]);
502 double clock = smpi_process_simulated_elapsed();
504 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
507 root= atoi(action[4]);
509 MPI_CURRENT_TYPE=decode_datatype(action[5]);
513 int rank = smpi_process_index();
514 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
515 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
516 extra->type = TRACING_REDUCE;
517 extra->send_size = comm_size;
518 extra->comp_size = comp_size;
519 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
520 extra->root = root_traced;
522 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
524 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
525 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
526 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
527 smpi_execute_flops(comp_size);
529 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
530 log_timed_action (action, clock);
533 static void action_allReduce(const char *const *action) {
534 CHECK_ACTION_PARAMS(action, 2, 1)
535 double comm_size = parse_double(action[2]);
536 double comp_size = parse_double(action[3]);
539 MPI_CURRENT_TYPE=decode_datatype(action[4]);
541 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
543 double clock = smpi_process_simulated_elapsed();
544 int rank = smpi_process_index();
545 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
546 extra->type = TRACING_ALLREDUCE;
547 extra->send_size = comm_size;
548 extra->comp_size = comp_size;
549 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
550 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
552 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
553 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
554 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
555 smpi_execute_flops(comp_size);
557 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
558 log_timed_action (action, clock);
561 static void action_allToAll(const char *const *action) {
562 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
563 //two optional (corresponding datatypes)
564 double clock = smpi_process_simulated_elapsed();
565 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
566 int send_size = parse_double(action[2]);
567 int recv_size = parse_double(action[3]);
568 MPI_Datatype MPI_CURRENT_TYPE2;
570 if(action[4] && action[5]) {
571 MPI_CURRENT_TYPE=decode_datatype(action[4]);
572 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
575 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
576 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
579 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
580 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
582 int rank = smpi_process_index();
583 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
584 extra->type = TRACING_ALLTOALL;
585 extra->send_size = send_size;
586 extra->recv_size = recv_size;
587 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
588 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
590 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
592 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
594 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
595 log_timed_action (action, clock);
598 static void action_gather(const char *const *action) {
599 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
603 1) 68 is the sendcounts
604 2) 68 is the recvcounts
605 3) 0 is the root node
606 4) 0 is the send datatype id, see decode_datatype()
607 5) 0 is the recv datatype id, see decode_datatype()
609 CHECK_ACTION_PARAMS(action, 2, 3)
610 double clock = smpi_process_simulated_elapsed();
611 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
612 int send_size = parse_double(action[2]);
613 int recv_size = parse_double(action[3]);
614 MPI_Datatype MPI_CURRENT_TYPE2;
615 if(action[4] && action[5]) {
616 MPI_CURRENT_TYPE=decode_datatype(action[5]);
617 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
619 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
622 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
623 void *recv = nullptr;
626 root=atoi(action[4]);
627 int rank = smpi_comm_rank(MPI_COMM_WORLD);
630 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
632 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
633 extra->type = TRACING_GATHER;
634 extra->send_size = send_size;
635 extra->recv_size = recv_size;
637 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
638 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
640 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
642 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
644 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
645 log_timed_action (action, clock);
648 static void action_gatherv(const char *const *action) {
649 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
650 0 gather 68 68 10 10 10 0 0 0
653 1) 68 is the sendcount
654 2) 68 10 10 10 is the recvcounts
655 3) 0 is the root node
656 4) 0 is the send datatype id, see decode_datatype()
657 5) 0 is the recv datatype id, see decode_datatype()
660 double clock = smpi_process_simulated_elapsed();
661 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
662 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
663 int send_size = parse_double(action[2]);
664 int *disps = xbt_new0(int, comm_size);
665 int *recvcounts = xbt_new0(int, comm_size);
668 MPI_Datatype MPI_CURRENT_TYPE2;
669 if(action[4+comm_size] && action[5+comm_size]) {
670 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
671 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
673 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
674 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
676 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
677 void *recv = nullptr;
678 for(i=0;i<comm_size;i++) {
679 recvcounts[i] = atoi(action[i+3]);
680 recv_sum=recv_sum+recvcounts[i];
684 int root=atoi(action[3+comm_size]);
685 int rank = smpi_comm_rank(MPI_COMM_WORLD);
688 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
690 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691 extra->type = TRACING_GATHERV;
692 extra->send_size = send_size;
693 extra->recvcounts= xbt_new(int,comm_size);
694 for(i=0; i< comm_size; i++)//copy data to avoid bad free
695 extra->recvcounts[i] = recvcounts[i];
697 extra->num_processes = comm_size;
698 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
699 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
701 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
703 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
705 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
706 log_timed_action (action, clock);
707 xbt_free(recvcounts);
711 static void action_reducescatter(const char *const *action) {
712 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
713 0 reduceScatter 275427 275427 275427 204020 11346849 0
716 1) The first four values after the name of the action declare the recvcounts array
717 2) The value 11346849 is the amount of instructions
718 3) The last value corresponds to the datatype, see decode_datatype().
720 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
721 double clock = smpi_process_simulated_elapsed();
722 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
723 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
724 int comp_size = parse_double(action[2+comm_size]);
725 int *recvcounts = xbt_new0(int, comm_size);
726 int *disps = xbt_new0(int, comm_size);
728 int rank = smpi_process_index();
730 if(action[3+comm_size])
731 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
733 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
735 for(i=0;i<comm_size;i++) {
736 recvcounts[i] = atoi(action[i+2]);
741 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
742 extra->type = TRACING_REDUCE_SCATTER;
743 extra->send_size = 0;
744 extra->recvcounts= xbt_new(int, comm_size);
745 for(i=0; i< comm_size; i++)//copy data to avoid bad free
746 extra->recvcounts[i] = recvcounts[i];
747 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
748 extra->comp_size = comp_size;
749 extra->num_processes = comm_size;
751 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
753 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
754 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
756 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
757 smpi_execute_flops(comp_size);
759 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
760 xbt_free(recvcounts);
762 log_timed_action (action, clock);
765 static void action_allgather(const char *const *action) {
766 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
767 0 allGather 275427 275427
770 1) 275427 is the sendcount
771 2) 275427 is the recvcount
772 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). */
773 double clock = smpi_process_simulated_elapsed();
775 CHECK_ACTION_PARAMS(action, 2, 2)
776 int sendcount=atoi(action[2]);
777 int recvcount=atoi(action[3]);
779 MPI_Datatype MPI_CURRENT_TYPE2;
781 if(action[4] && action[5]) {
782 MPI_CURRENT_TYPE = decode_datatype(action[4]);
783 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
785 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
786 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
788 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
789 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
791 int rank = smpi_process_index();
792 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
793 extra->type = TRACING_ALLGATHER;
794 extra->send_size = sendcount;
795 extra->recv_size= recvcount;
796 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
797 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
798 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
800 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
802 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
804 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
805 log_timed_action (action, clock);
808 static void action_allgatherv(const char *const *action) {
809 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
810 0 allGatherV 275427 275427 275427 275427 204020
813 1) 275427 is the sendcount
814 2) The next four elements declare the recvcounts array
815 3) No more values mean that the datatype for sent and receive buffer
816 is the default one, see decode_datatype(). */
817 double clock = smpi_process_simulated_elapsed();
819 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
820 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
822 int sendcount=atoi(action[2]);
823 int *recvcounts = xbt_new0(int, comm_size);
824 int *disps = xbt_new0(int, comm_size);
826 MPI_Datatype MPI_CURRENT_TYPE2;
828 if(action[3+comm_size] && action[4+comm_size]) {
829 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
830 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
832 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
833 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
835 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
837 for(i=0;i<comm_size;i++) {
838 recvcounts[i] = atoi(action[i+3]);
839 recv_sum=recv_sum+recvcounts[i];
841 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
843 int rank = smpi_process_index();
844 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
845 extra->type = TRACING_ALLGATHERV;
846 extra->send_size = sendcount;
847 extra->recvcounts= xbt_new(int, comm_size);
848 for(i=0; i< comm_size; i++)//copy data to avoid bad free
849 extra->recvcounts[i] = recvcounts[i];
850 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
851 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
852 extra->num_processes = comm_size;
854 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
856 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
859 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
860 log_timed_action (action, clock);
861 xbt_free(recvcounts);
865 static void action_allToAllv(const char *const *action) {
866 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
867 0 allToAllV 100 1 7 10 12 100 1 70 10 5
870 1) 100 is the size of the send buffer *sizeof(int),
871 2) 1 7 10 12 is the sendcounts array
872 3) 100*sizeof(int) is the size of the receiver buffer
873 4) 1 70 10 5 is the recvcounts array */
874 double clock = smpi_process_simulated_elapsed();
876 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
877 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
878 int *sendcounts = xbt_new0(int, comm_size);
879 int *recvcounts = xbt_new0(int, comm_size);
880 int *senddisps = xbt_new0(int, comm_size);
881 int *recvdisps = xbt_new0(int, comm_size);
883 MPI_Datatype MPI_CURRENT_TYPE2;
885 int send_buf_size=parse_double(action[2]);
886 int recv_buf_size=parse_double(action[3+comm_size]);
887 if(action[4+2*comm_size] && action[5+2*comm_size]) {
888 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
889 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
892 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
893 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
896 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
897 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
899 for(int i=0;i<comm_size;i++) {
900 sendcounts[i] = atoi(action[i+3]);
901 recvcounts[i] = atoi(action[i+4+comm_size]);
904 int rank = smpi_process_index();
905 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
906 extra->type = TRACING_ALLTOALLV;
907 extra->recvcounts= xbt_new(int, comm_size);
908 extra->sendcounts= xbt_new(int, comm_size);
909 extra->num_processes = comm_size;
911 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
912 extra->send_size += sendcounts[i];
913 extra->sendcounts[i] = sendcounts[i];
914 extra->recv_size += recvcounts[i];
915 extra->recvcounts[i] = recvcounts[i];
917 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
918 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
920 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
922 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
923 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
925 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
926 log_timed_action (action, clock);
927 xbt_free(sendcounts);
928 xbt_free(recvcounts);
933 void smpi_replay_run(int *argc, char***argv){
934 /* First initializes everything */
935 smpi_process_init(argc, argv);
936 smpi_process_mark_as_initialized();
937 smpi_process_set_replaying(true);
939 int rank = smpi_process_index();
940 TRACE_smpi_init(rank);
941 TRACE_smpi_computing_init(rank);
942 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
943 extra->type = TRACING_INIT;
944 char *operation =bprintf("%s_init",__FUNCTION__);
945 TRACE_smpi_collective_in(rank, -1, operation, extra);
946 TRACE_smpi_collective_out(rank, -1, operation);
949 if (_xbt_replay_action_init()==0) {
950 xbt_replay_action_register("init", action_init);
951 xbt_replay_action_register("finalize", action_finalize);
952 xbt_replay_action_register("comm_size", action_comm_size);
953 xbt_replay_action_register("comm_split", action_comm_split);
954 xbt_replay_action_register("comm_dup", action_comm_dup);
955 xbt_replay_action_register("send", action_send);
956 xbt_replay_action_register("Isend", action_Isend);
957 xbt_replay_action_register("recv", action_recv);
958 xbt_replay_action_register("Irecv", action_Irecv);
959 xbt_replay_action_register("test", action_test);
960 xbt_replay_action_register("wait", action_wait);
961 xbt_replay_action_register("waitAll", action_waitall);
962 xbt_replay_action_register("barrier", action_barrier);
963 xbt_replay_action_register("bcast", action_bcast);
964 xbt_replay_action_register("reduce", action_reduce);
965 xbt_replay_action_register("allReduce", action_allReduce);
966 xbt_replay_action_register("allToAll", action_allToAll);
967 xbt_replay_action_register("allToAllV", action_allToAllv);
968 xbt_replay_action_register("gather", action_gather);
969 xbt_replay_action_register("gatherV", action_gatherv);
970 xbt_replay_action_register("allGather", action_allgather);
971 xbt_replay_action_register("allGatherV", action_allgatherv);
972 xbt_replay_action_register("reduceScatter", action_reducescatter);
973 xbt_replay_action_register("compute", action_compute);
976 //if we have a delayed start, sleep here.
979 double value = strtod((*argv)[2], &endptr);
981 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
982 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
983 smpi_execute_flops(value);
985 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
986 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
987 smpi_execute_flops(0.0);
990 /* Actually run the replay */
991 xbt_replay_action_runner(*argc, *argv);
993 /* and now, finalize everything */
995 /* One active process will stop. Decrease the counter*/
996 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
997 if (!get_reqq_self()->empty()){
998 unsigned int count_requests=get_reqq_self()->size();
999 MPI_Request requests[count_requests];
1000 MPI_Status status[count_requests];
1003 for (auto req: *get_reqq_self()){
1007 smpi_mpi_waitall(count_requests, requests, status);
1013 if(active_processes==0){
1014 /* Last process alive speaking */
1015 /* end the simulated timer */
1016 sim_time = smpi_process_simulated_elapsed();
1017 XBT_INFO("Simulation time %f", sim_time);
1018 _xbt_replay_action_exit();
1019 xbt_free(sendbuffer);
1020 xbt_free(recvbuffer);
1023 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1024 extra_fin->type = TRACING_FINALIZE;
1025 operation =bprintf("%s_finalize",__FUNCTION__);
1026 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1028 smpi_process_finalize();
1030 TRACE_smpi_collective_out(rank, -1, operation);
1031 TRACE_smpi_finalize(smpi_process_index());
1032 smpi_process_destroy();
1033 xbt_free(operation);