From: George Markomanolis Date: Thu, 31 Jan 2013 20:51:04 +0000 (+0200) Subject: Adding MPI_Alltoall and MPI_Waitall to the SMPI replay tool X-Git-Tag: v3_9_90~565 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/a0717ebd450c0d360a4ef5c5898d51e3b12cbb81 Adding MPI_Alltoall and MPI_Waitall to the SMPI replay tool --- diff --git a/src/smpi/smpi_replay.c b/src/smpi/smpi_replay.c index 843129146a..1f4b8421b0 100644 --- a/src/smpi/smpi_replay.c +++ b/src/smpi/smpi_replay.c @@ -14,6 +14,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI"); int communicator_size = 0; static int active_processes = 0; +xbt_dynar_t *reqq; static void log_timed_action (const char *const *action, double clock){ if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){ @@ -23,11 +24,13 @@ static void log_timed_action (const char *const *action, double clock){ } } + typedef struct { xbt_dynar_t isends; /* of MPI_Request */ xbt_dynar_t irecvs; /* of MPI_Request */ } s_smpi_replay_globals_t, *smpi_replay_globals_t; + /* Helper function */ static double parse_double(const char *string) { @@ -41,17 +44,27 @@ static double parse_double(const char *string) static void action_init(const char *const *action) { + int i; XBT_DEBUG("Initialize the counters"); smpi_replay_globals_t globals = xbt_new(s_smpi_replay_globals_t, 1); globals->isends = xbt_dynar_new(sizeof(MPI_Request),NULL); globals->irecvs = xbt_dynar_new(sizeof(MPI_Request),NULL); - + + smpi_process_set_user_data((void*) globals); /* start a simulated timer */ smpi_process_simulated_start(); /*initialize the number of active processes */ active_processes = smpi_process_count(); + + reqq=xbt_new0(xbt_dynar_t,active_processes); + + for(i=0;isend = 1; @@ -146,6 +160,7 @@ static void action_Isend(const char *const *action) #endif xbt_dynar_push(globals->isends,&request); + xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request); log_timed_action (action, clock); } @@ -190,11 +205,13 @@ static void action_Irecv(const char *const *action) #endif request = smpi_mpi_irecv(NULL, size, MPI_BYTE, from, 0, MPI_COMM_WORLD); + #ifdef HAVE_TRACING TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); request->recv = 1; #endif xbt_dynar_push(globals->irecvs,&request); + xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request); log_timed_action (action, clock); } @@ -236,7 +253,76 @@ static void action_wait(const char *const *action){ static void action_waitall(const char *const *action){ double clock = smpi_process_simulated_elapsed(); -// smpi_mpi_waitall(count, requests, status); + int count_requests=0,req_counts=0,i=0; + smpi_replay_globals_t globals = + (smpi_replay_globals_t) smpi_process_get_user_data(); + + count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]); + + if (count_requests>0) { + MPI_Request requests[count_requests]; + MPI_Status status[count_requests]; + + for(i=0;isrc; + *adst = requests[i]->dst; + *arecv = requests[i]->recv; + xbt_dynar_insert_at(srcs, i, asrc); + xbt_dynar_insert_at(dsts, i, adst); + xbt_dynar_insert_at(recvs, i, arecv); + xbt_free(asrc); + xbt_free(adst); + xbt_free(arecv); + }else { + int *t = xbt_new(int, 1); + xbt_dynar_insert_at(srcs, i, t); + xbt_dynar_insert_at(dsts, i, t); + xbt_dynar_insert_at(recvs, i, t); + xbt_free(t); + } + } + int rank_traced = smpi_process_index(); + TRACE_smpi_computing_out(rank_traced); + + TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__); + #endif + + smpi_mpi_waitall(count_requests, requests, status); + + #ifdef HAVE_TRACING + for (i = 0; i < count_requests; i++) { + int src_traced, dst_traced, is_wait_for_receive; + xbt_dynar_get_cpy(srcs, i, &src_traced); + xbt_dynar_get_cpy(dsts, i, &dst_traced); + xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive); + if (is_wait_for_receive) { + TRACE_smpi_recv(rank_traced, src_traced, dst_traced); + } + } + TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__); + //clean-up of dynars + xbt_dynar_free(&srcs); + xbt_dynar_free(&dsts); + xbt_dynar_free(&recvs); + TRACE_smpi_computing_in(rank_traced); + #endif + + xbt_dynar_reset(reqq[smpi_comm_rank(MPI_COMM_WORLD)]); + } log_timed_action (action, clock); } @@ -317,9 +403,12 @@ static void action_allReduce(const char *const *action) { static void action_allToAll(const char *const *action) { double clock = smpi_process_simulated_elapsed(); - double comm_size = smpi_comm_size(MPI_COMM_WORLD); - double send_size = parse_double(action[2]); - double recv_size = parse_double(action[3]); + int comm_size = smpi_comm_size(MPI_COMM_WORLD); + int send_size = atoi(action[2]); + int recv_size = atoi(action[3]); + void *send = xbt_new0(int, send_size*comm_size); + void *recv = xbt_new0(int, send_size*comm_size); + #ifdef HAVE_TRACING int rank = smpi_process_index(); @@ -328,16 +417,17 @@ static void action_allToAll(const char *const *action) { #endif if (send_size < 200 && comm_size > 12) { - smpi_coll_tuned_alltoall_bruck(NULL, send_size, MPI_BYTE, - NULL, recv_size, MPI_BYTE, + smpi_coll_tuned_alltoall_bruck(send, send_size, MPI_BYTE, + recv, recv_size, MPI_BYTE, MPI_COMM_WORLD); } else if (send_size < 3000) { - smpi_coll_tuned_alltoall_basic_linear(NULL, send_size, MPI_BYTE, - NULL, recv_size, MPI_BYTE, + + smpi_coll_tuned_alltoall_basic_linear(send, send_size, MPI_BYTE, + recv, recv_size, MPI_BYTE, MPI_COMM_WORLD); } else { - smpi_coll_tuned_alltoall_pairwise(NULL, send_size, MPI_BYTE, - NULL, recv_size, MPI_BYTE, + smpi_coll_tuned_alltoall_pairwise(send, send_size, MPI_BYTE, + recv, recv_size, MPI_BYTE, MPI_COMM_WORLD); } @@ -347,12 +437,21 @@ static void action_allToAll(const char *const *action) { #endif log_timed_action (action, clock); + xbt_free(send); + xbt_free(recv); } static void action_allToAllv(const char *const *action) { double clock = smpi_process_simulated_elapsed(); + int comm_size = smpi_comm_size(MPI_COMM_WORLD); +// PMPI_Alltoallv(NULL, send_size, send_disp, +// MPI_BYTE, NULL, recv_size, +// recv_disp, MPI_BYTE, MPI_COMM_WORLD); + + log_timed_action (action, clock); + } void smpi_replay_init(int *argc, char***argv){ @@ -386,10 +485,10 @@ int smpi_replay_finalize(){ double sim_time= 1.; /* One active process will stop. Decrease the counter*/ active_processes--; - if(!active_processes){ /* Last process alive speaking */ /* end the simulated timer */ + xbt_dynar_free(reqq); sim_time = smpi_process_simulated_elapsed(); XBT_INFO("Simulation time %g", sim_time); _xbt_replay_action_exit();