From 09304e54d26cb2a5b0d3f67bd31f95c982d7c299 Mon Sep 17 00:00:00 2001 From: degomme Date: Wed, 5 Apr 2017 17:03:26 +0200 Subject: [PATCH] Add MPI_Rput, Rget, Raccumulate and Rget_accumulate calls. They return the request for separate termination instead of putting it in the queue. --- include/smpi/smpi.h | 11 +++ src/smpi/smpi_mpi.cpp | 4 ++ src/smpi/smpi_pmpi.cpp | 160 +++++++++++++++++++++++++++++++++++++++++ src/smpi/smpi_win.cpp | 51 ++++++++----- src/smpi/smpi_win.hpp | 9 +-- 5 files changed, 215 insertions(+), 20 deletions(-) diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index 500b64bb57..502c6ae6f8 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -566,6 +566,17 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Accumulate,( void *origin_addr, int origin_count, MPI_CALL(XBT_PUBLIC(int), MPI_Get_accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void* result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win)); + +MPI_CALL(XBT_PUBLIC(int), MPI_Rget,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request)); +MPI_CALL(XBT_PUBLIC(int), MPI_Rput,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request)); +MPI_CALL(XBT_PUBLIC(int), MPI_Raccumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, + int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request)); +MPI_CALL(XBT_PUBLIC(int), MPI_Rget_accumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, + void* result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, + int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request)); + MPI_CALL(XBT_PUBLIC(int), MPI_Fetch_and_op,( void *origin_addr, void* result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Op op, MPI_Win win)); MPI_CALL(XBT_PUBLIC(int), MPI_Compare_and_swap, (void *origin_addr, void *compare_addr, diff --git a/src/smpi/smpi_mpi.cpp b/src/smpi/smpi_mpi.cpp index 53f8c64f33..35ae81d2b1 100644 --- a/src/smpi/smpi_mpi.cpp +++ b/src/smpi/smpi_mpi.cpp @@ -128,6 +128,7 @@ WRAPPED_PMPI_CALL(int,MPI_Pack,(void* inbuf, int incount, MPI_Datatype type, voi WRAPPED_PMPI_CALL(int,MPI_Probe,(int source, int tag, MPI_Comm comm, MPI_Status* status) ,(source, tag, comm, status)) WRAPPED_PMPI_CALL(int,MPI_Put,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win),(origin_addr,origin_count, origin_datatype,target_rank,target_disp, target_count,target_datatype, win)) WRAPPED_PMPI_CALL(int,MPI_Query_thread,(int *provided),(provided)) +WRAPPED_PMPI_CALL(int,MPI_Raccumulate,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request),( origin_addr,origin_count, origin_datatype,target_rank,target_disp, target_count,target_datatype,op, win, request)) WRAPPED_PMPI_CALL(int,MPI_Recv_init,(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request * request),(buf, count, datatype, src, tag, comm, request)) WRAPPED_PMPI_CALL(int,MPI_Recv,(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status * status),(buf, count, datatype, src, tag, comm, status)) WRAPPED_PMPI_CALL(int,MPI_Reduce_local,(void *inbuf, void *inoutbuf, int count, MPI_Datatype datatype, MPI_Op op),(inbuf, inoutbuf, count, datatype, op)) @@ -135,6 +136,9 @@ WRAPPED_PMPI_CALL(int,MPI_Reduce_scatter_block,(void *sendbuf, void *recvbuf, in WRAPPED_PMPI_CALL(int,MPI_Reduce_scatter,(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm),(sendbuf, recvbuf, recvcounts, datatype, op, comm)) WRAPPED_PMPI_CALL(int,MPI_Reduce,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm),(sendbuf, recvbuf, count, datatype, op, root, comm)) WRAPPED_PMPI_CALL(int,MPI_Request_free,(MPI_Request * request),(request)) +WRAPPED_PMPI_CALL(int,MPI_Rget,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank,MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request),(origin_addr,origin_count, origin_datatype,target_rank, target_disp, target_count,target_datatype,win, request)) +WRAPPED_PMPI_CALL(int,MPI_Rget_accumulate, (void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request),(origin_addr, origin_count, origin_datatype, result_addr, result_count, result_datatype, target_rank, target_disp, target_count, target_datatype, op, win, request)) +WRAPPED_PMPI_CALL(int,MPI_Rput,( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request),(origin_addr,origin_count, origin_datatype,target_rank,target_disp, target_count,target_datatype, win, request)) WRAPPED_PMPI_CALL(int,MPI_Scan,(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm),(sendbuf, recvbuf, count, datatype, op, comm)) WRAPPED_PMPI_CALL(int,MPI_Scatter,(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm),(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm)) WRAPPED_PMPI_CALL(int,MPI_Scatterv,(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,MPI_Datatype recvtype, int root, MPI_Comm comm),(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm)) diff --git a/src/smpi/smpi_pmpi.cpp b/src/smpi/smpi_pmpi.cpp index 65c0abd74b..d68f54c12e 100644 --- a/src/smpi/smpi_pmpi.cpp +++ b/src/smpi/smpi_pmpi.cpp @@ -2639,6 +2639,43 @@ int PMPI_Get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, return retval; } +int PMPI_Rget( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + *request = MPI_REQUEST_NULL; + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (win->dynamic()==0 && target_disp <0){ + //in case of dynamic window, target_disp can be mistakenly seen as negative, as it is an address + retval = MPI_ERR_ARG; + } else if ((origin_count < 0 || target_count < 0) || + (origin_addr==nullptr && origin_count > 0)){ + retval = MPI_ERR_COUNT; + } else if ((!origin_datatype->is_valid()) || (!target_datatype->is_valid())) { + retval = MPI_ERR_TYPE; + } else if(request == nullptr){ + retval = MPI_ERR_REQUEST; + } else { + int rank = smpi_process()->index(); + MPI_Group group; + win->get_group(&group); + int src_traced = group->index(target_rank); + TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, nullptr); + + retval = win->get( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype, request); + + TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win){ int retval = 0; @@ -2674,6 +2711,44 @@ int PMPI_Put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, return retval; } +int PMPI_Rput( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win, MPI_Request* request){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + *request = MPI_REQUEST_NULL; + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (win->dynamic()==0 && target_disp <0){ + //in case of dynamic window, target_disp can be mistakenly seen as negative, as it is an address + retval = MPI_ERR_ARG; + } else if ((origin_count < 0 || target_count < 0) || + (origin_addr==nullptr && origin_count > 0)){ + retval = MPI_ERR_COUNT; + } else if ((!origin_datatype->is_valid()) || (!target_datatype->is_valid())) { + retval = MPI_ERR_TYPE; + } else if(request == nullptr){ + retval = MPI_ERR_REQUEST; + } else { + int rank = smpi_process()->index(); + MPI_Group group; + win->get_group(&group); + int dst_traced = group->index(target_rank); + TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, nullptr); + TRACE_smpi_send(rank, rank, dst_traced, SMPI_RMA_TAG, origin_count*origin_datatype->size()); + + retval = win->put( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype, request); + + TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){ int retval = 0; @@ -2711,6 +2786,46 @@ int PMPI_Accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da return retval; } +int PMPI_Raccumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + *request = MPI_REQUEST_NULL; + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (win->dynamic()==0 && target_disp <0){ + //in case of dynamic window, target_disp can be mistakenly seen as negative, as it is an address + retval = MPI_ERR_ARG; + } else if ((origin_count < 0 || target_count < 0) || + (origin_addr==nullptr && origin_count > 0)){ + retval = MPI_ERR_COUNT; + } else if ((!origin_datatype->is_valid()) || + (!target_datatype->is_valid())) { + retval = MPI_ERR_TYPE; + } else if (op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else if(request == nullptr){ + retval = MPI_ERR_REQUEST; + } else { + int rank = smpi_process()->index(); + MPI_Group group; + win->get_group(&group); + int src_traced = group->index(target_rank); + TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, nullptr); + + retval = win->accumulate( origin_addr, origin_count, origin_datatype, target_rank, target_disp, target_count, + target_datatype, op, request); + + TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Get_accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){ @@ -2752,6 +2867,51 @@ MPI_Datatype target_datatype, MPI_Op op, MPI_Win win){ return retval; } + +int PMPI_Rget_accumulate(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, +int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, +MPI_Datatype target_datatype, MPI_Op op, MPI_Win win, MPI_Request* request){ + int retval = 0; + smpi_bench_end(); + if (win == MPI_WIN_NULL) { + retval = MPI_ERR_WIN; + } else if (target_rank == MPI_PROC_NULL) { + *request = MPI_REQUEST_NULL; + retval = MPI_SUCCESS; + } else if (target_rank <0){ + retval = MPI_ERR_RANK; + } else if (win->dynamic()==0 && target_disp <0){ + //in case of dynamic window, target_disp can be mistakenly seen as negative, as it is an address + retval = MPI_ERR_ARG; + } else if ((origin_count < 0 || target_count < 0 || result_count <0) || + (origin_addr==nullptr && origin_count > 0) || + (result_addr==nullptr && result_count > 0)){ + retval = MPI_ERR_COUNT; + } else if ((!origin_datatype->is_valid()) || + (!target_datatype->is_valid())|| + (!result_datatype->is_valid())) { + retval = MPI_ERR_TYPE; + } else if (op == MPI_OP_NULL) { + retval = MPI_ERR_OP; + } else if(request == nullptr){ + retval = MPI_ERR_REQUEST; + } else { + int rank = smpi_process()->index(); + MPI_Group group; + win->get_group(&group); + int src_traced = group->index(target_rank); + TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, nullptr); + + retval = win->get_accumulate( origin_addr, origin_count, origin_datatype, result_addr, + result_count, result_datatype, target_rank, target_disp, + target_count, target_datatype, op, request); + + TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Fetch_and_op(void *origin_addr, void *result_addr, MPI_Datatype dtype, int target_rank, MPI_Aint target_disp, MPI_Op op, MPI_Win win){ return PMPI_Get_accumulate(origin_addr, origin_addr==nullptr?0:1, dtype, result_addr, 1, dtype, target_rank, target_disp, 1, dtype, op, win); } diff --git a/src/smpi/smpi_win.cpp b/src/smpi/smpi_win.cpp index 48b992dbef..28f399f4d0 100644 --- a/src/smpi/smpi_win.cpp +++ b/src/smpi/smpi_win.cpp @@ -181,7 +181,7 @@ int Win::fence(int assert) } int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request) { //get receiver pointer MPI_Win recv_win = connected_wins_[target_rank]; @@ -213,24 +213,32 @@ int Win::put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, //start send sreq->start(); + + if(request!=nullptr){ + *request=sreq; + }else{ + xbt_mutex_acquire(mut_); + requests_->push_back(sreq); + xbt_mutex_release(mut_); + } + //push request to receiver's win xbt_mutex_acquire(recv_win->mut_); recv_win->requests_->push_back(rreq); rreq->start(); xbt_mutex_release(recv_win->mut_); - //push request to sender's win - xbt_mutex_acquire(mut_); - requests_->push_back(sreq); - xbt_mutex_release(mut_); + }else{ Datatype::copy(origin_addr, origin_count, origin_datatype, recv_addr, target_count, target_datatype); + if(request!=nullptr) + *request = MPI_REQUEST_NULL; } return MPI_SUCCESS; } int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype) + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request) { //get sender pointer MPI_Win send_win = connected_wins_[target_rank]; @@ -271,12 +279,19 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, //start recv rreq->start(); - //push request to sender's win - xbt_mutex_acquire(mut_); - requests_->push_back(rreq); - xbt_mutex_release(mut_); + + if(request!=nullptr){ + *request=rreq; + }else{ + xbt_mutex_acquire(mut_); + requests_->push_back(rreq); + xbt_mutex_release(mut_); + } + }else{ Datatype::copy(send_addr, target_count, target_datatype, origin_addr, origin_count, origin_datatype); + if(request!=nullptr) + *request=MPI_REQUEST_NULL; } return MPI_SUCCESS; @@ -284,7 +299,7 @@ int Win::get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op) + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request) { //get receiver pointer @@ -325,17 +340,21 @@ int Win::accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_da recv_win->requests_->push_back(rreq); rreq->start(); xbt_mutex_release(recv_win->mut_); - //push request to sender's win - xbt_mutex_acquire(mut_); - requests_->push_back(sreq); - xbt_mutex_release(mut_); + + if(request!=nullptr){ + *request=sreq; + }else{ + xbt_mutex_acquire(mut_); + requests_->push_back(sreq); + xbt_mutex_release(mut_); + } return MPI_SUCCESS; } int Win::get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, - MPI_Datatype target_datatype, MPI_Op op){ + MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request){ //get sender pointer MPI_Win send_win = connected_wins_[target_rank]; diff --git a/src/smpi/smpi_win.hpp b/src/smpi/smpi_win.hpp index a0f9aa77e3..ddbdae45a9 100644 --- a/src/smpi/smpi_win.hpp +++ b/src/smpi/smpi_win.hpp @@ -63,18 +63,19 @@ public: int disp_unit(); int fence(int assert); int put( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype); + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request=nullptr); int get( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype); + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Request* request=nullptr); int accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, - MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op); + MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request=nullptr); int get_accumulate( void *origin_addr, int origin_count, MPI_Datatype origin_datatype, void *result_addr, int result_count, MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp, int target_count, - MPI_Datatype target_datatype, MPI_Op op); + MPI_Datatype target_datatype, MPI_Op op, MPI_Request* request=nullptr); int compare_and_swap(void *origin_addr, void *compare_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp); static Win* f2c(int id); + int lock(int lock_type, int rank, int assert); int unlock(int rank); int lock_all(int assert); -- 2.20.1