From: degomme Date: Thu, 12 Apr 2018 10:26:25 +0000 (+0200) Subject: Basic implementation of MPI_Cancel X-Git-Tag: v3.20~461 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/51fbc81e5efb63f45ce68952b8d8c1a03730022f Basic implementation of MPI_Cancel --- diff --git a/include/smpi/mpif.h.in b/include/smpi/mpif.h.in index f346bb3e56..2a435903ad 100644 --- a/include/smpi/mpif.h.in +++ b/include/smpi/mpif.h.in @@ -113,7 +113,7 @@ ! This should be equal to the number of int fields in MPI_Status integer MPI_STATUS_SIZE - parameter(MPI_STATUS_SIZE=4) + parameter(MPI_STATUS_SIZE=5) integer MPI_INTEGER_KIND parameter(MPI_INTEGER_KIND=4) diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index 4ef2ae6354..e9fa18d799 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -215,6 +215,7 @@ typedef struct { int MPI_TAG; int MPI_ERROR; int count; + int cancelled; } MPI_Status; typedef SMPI_Win* MPI_Win; diff --git a/src/smpi/bindings/smpi_mpi.cpp b/src/smpi/bindings/smpi_mpi.cpp index a708a8df5f..963409c365 100644 --- a/src/smpi/bindings/smpi_mpi.cpp +++ b/src/smpi/bindings/smpi_mpi.cpp @@ -259,8 +259,8 @@ WRAPPED_PMPI_CALL(MPI_Info, MPI_Info_f2c,(MPI_Fint info),(info)) WRAPPED_PMPI_CALL(MPI_Op, MPI_Op_f2c,(MPI_Fint op),(op)) WRAPPED_PMPI_CALL(MPI_Request, MPI_Request_f2c,(MPI_Fint request),(request)) WRAPPED_PMPI_CALL(MPI_Win, MPI_Win_f2c,(MPI_Fint win),(win)) - - +WRAPPED_PMPI_CALL(int,MPI_Cancel,(MPI_Request* request) ,(request)) +WRAPPED_PMPI_CALL(int, MPI_Test_cancelled,(MPI_Status* status, int* flag) ,(status, flag)) /* Unimplemented Calls - both PMPI and MPI calls are generated. When implementing, please move ahead, swap UNIMPLEMENTED_WRAPPED_PMPI_CALL for WRAPPED_PMPI_CALL, @@ -276,7 +276,6 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Bsend_init,(void* buf, int count, MPI_Da UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Bsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Buffer_attach,(void* buffer, int size) ,(buffer, size)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Buffer_detach,(void* buffer, int* size) ,(buffer, size)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Cancel,(MPI_Request* request) ,(request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Cart_map,(MPI_Comm comm_old, int ndims, int* dims, int* periods, int* newrank) ,(comm_old, ndims, dims, periods, newrank)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Close_port,( char *port_name),( port_name)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Comm_accept,( char *port_name, MPI_Info info, int root, MPI_Comm comm, MPI_Comm *newcomm),( port_name, info, root, comm, newcomm)) @@ -387,7 +386,6 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend_init,(void* buf, int count, MPI_Da UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Rsend,(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) ,(buf, count, datatype, dest, tag, comm)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_cancelled,(MPI_Status *status,int flag),(status,flag)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Status_set_elements,( MPI_Status *status, MPI_Datatype datatype, int count),( status, datatype, count)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_Test_cancelled,(MPI_Status* status, int* flag) ,(status, flag)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Topo_test,(MPI_Comm comm, int* top_type) ,(comm, top_type)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_create_darray,(int size, int rank, int ndims, int* array_of_gsizes, int* array_of_distribs, int* array_of_dargs, int* array_of_psizes,int order, MPI_Datatype oldtype, MPI_Datatype *newtype) ,(size, rank, ndims, array_of_gsizes,array_of_distribs, array_of_dargs, array_of_psizes,order,oldtype, newtype)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int,MPI_Type_get_contents,(MPI_Datatype datatype, int max_integers, int max_addresses, int max_datatypes, int* array_of_integers, MPI_Aint* array_of_addresses, MPI_Datatype *array_of_datatypes),(datatype, max_integers, max_addresses,max_datatypes, array_of_integers, array_of_addresses, array_of_datatypes)) diff --git a/src/smpi/bindings/smpi_pmpi_request.cpp b/src/smpi/bindings/smpi_pmpi_request.cpp index 8df9c83c4e..99c0ec27ad 100644 --- a/src/smpi/bindings/smpi_pmpi_request.cpp +++ b/src/smpi/bindings/smpi_pmpi_request.cpp @@ -680,6 +680,30 @@ int PMPI_Testsome(int incount, MPI_Request requests[], int* outcount, int* indic return retval; } +int PMPI_Cancel(MPI_Request* request) +{ + int retval = 0; + + smpi_bench_end(); + if (*request == MPI_REQUEST_NULL) { + retval = MPI_ERR_REQUEST; + } else { + (*request)->cancel(); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} + +int PMPI_Test_cancelled(MPI_Status* status, int* flag){ + if(status==MPI_STATUS_IGNORE){ + *flag=0; + return MPI_ERR_ARG; + } + *flag=simgrid::smpi::Status::cancelled(status); + return MPI_SUCCESS; +} + MPI_Request PMPI_Request_f2c(MPI_Fint request){ return static_cast(simgrid::smpi::Request::f2c(request)); } diff --git a/src/smpi/include/smpi_request.hpp b/src/smpi/include/smpi_request.hpp index 452494740c..7d80fe8191 100644 --- a/src/smpi/include/smpi_request.hpp +++ b/src/smpi/include/smpi_request.hpp @@ -38,6 +38,7 @@ class Request : public F2C { MPI_Request detached_sender_; int refcount_; MPI_Op op_; + int cancelled_; public: Request()=default; Request(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags); @@ -51,6 +52,7 @@ class Request : public F2C { int detached(); void print_request(const char *message); void start(); + void cancel(); static void finish_wait(MPI_Request* request, MPI_Status * status); static void unref(MPI_Request* request); diff --git a/src/smpi/include/smpi_status.hpp b/src/smpi/include/smpi_status.hpp index 8825cd51a1..43ec96631d 100644 --- a/src/smpi/include/smpi_status.hpp +++ b/src/smpi/include/smpi_status.hpp @@ -17,6 +17,7 @@ namespace smpi{ class Status{ public: static void empty(MPI_Status * status); +static int cancelled (MPI_Status * status); static int get_count(MPI_Status * status, MPI_Datatype datatype); }; diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 92551014f5..07b5e952f6 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -16,6 +16,7 @@ #include "src/mc/mc_replay.hpp" #include "src/simix/ActorImpl.hpp" #include "xbt/config.hpp" +#include #include @@ -67,6 +68,7 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, else refcount_ = 0; op_ = MPI_REPLACE; + cancelled_ = 0; } MPI_Comm Request::comm(){ @@ -139,6 +141,8 @@ int Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl* i ref->truncated_ = 1; if(req->detached_==1) ref->detached_sender_=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver + if(req->cancelled_==0) + req->cancelled_=-1;//mark as uncancellable XBT_DEBUG("match succeeded"); return 1; }else return 0; @@ -162,6 +166,8 @@ int Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl* i req->truncated_ = 1; if(ref->detached_==1) req->detached_sender_=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver + if(req->cancelled_==0) + req->cancelled_=-1;//mark as uncancellable XBT_DEBUG("match succeeded"); return 1; } else @@ -501,6 +507,14 @@ void Request::startall(int count, MPI_Request * requests) } } +void Request::cancel() +{ + if(cancelled_!=-1) + cancelled_=1; + if (this->action_ != nullptr) + (boost::static_pointer_cast(this->action_))->cancel(); +} + int Request::test(MPI_Request * request, MPI_Status * status) { //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before) // to avoid deadlocks if used as a break condition, such as @@ -514,8 +528,13 @@ int Request::test(MPI_Request * request, MPI_Status * status) { Status::empty(status); int flag = 1; if (((*request)->flags_ & PREPARED) == 0) { - if ((*request)->action_ != nullptr) - flag = simcall_comm_test((*request)->action_); + if ((*request)->action_ != nullptr){ + try{ + flag = simcall_comm_test((*request)->action_); + }catch (xbt_ex& e) { + return 0; + } + } if (flag) { finish_wait(request,status); nsleeps=1;//reset the number of sleeps we will do next time @@ -576,8 +595,12 @@ int Request::testany(int count, MPI_Request requests[], int *index, MPI_Status * static int nsleeps = 1; if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); - - i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches! + try{ + i = simcall_comm_testany(comms.data(), comms.size()); // The i-th element in comms matches! + }catch (xbt_ex& e) { + return 0; + } + if (i != -1) { // -1 is not MPI_UNDEFINED but a SIMIX return code. (nothing matches) *index = map[i]; finish_wait(&requests[*index],status); @@ -689,6 +712,12 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) { MPI_Request req = *request; Status::empty(status); + + if (req->cancelled_==1){ + if (status!=MPI_STATUS_IGNORE) + status->cancelled=1; + return; + } if (not((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)) { if(status != MPI_STATUS_IGNORE) { @@ -758,9 +787,15 @@ void Request::wait(MPI_Request * request, MPI_Status * status) return; } - if ((*request)->action_ != nullptr) - // this is not a detached send - simcall_comm_wait((*request)->action_, -1.0); + if ((*request)->action_ != nullptr){ + try{ + // this is not a detached send + simcall_comm_wait((*request)->action_, -1.0); + }catch (xbt_ex& e) { + XBT_VERB("Request cancelled"); + } + } + finish_wait(request,status); if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & NON_PERSISTENT)!=0)) @@ -802,7 +837,14 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) } if (size > 0) { XBT_DEBUG("Enter waitany for %lu comms", xbt_dynar_length(&comms)); - int i = simcall_comm_waitany(&comms, -1); + int i=MPI_UNDEFINED; + try{ + // this is not a detached send + i = simcall_comm_waitany(&comms, -1); + }catch (xbt_ex& e) { + XBT_INFO("request %d cancelled ",i); + return i; + } // not MPI_UNDEFINED, as this is a simix return code if (i != -1) { @@ -856,6 +898,7 @@ int Request::waitall(int count, MPI_Request requests[], MPI_Status status[]) index = c; } else { index = waitany(count, (MPI_Request*)requests, pstat); + if (index == MPI_UNDEFINED) break; diff --git a/src/smpi/mpi/smpi_status.cpp b/src/smpi/mpi/smpi_status.cpp index c0c5f4bd0e..ed6bf53d24 100644 --- a/src/smpi/mpi/smpi_status.cpp +++ b/src/smpi/mpi/smpi_status.cpp @@ -18,9 +18,15 @@ void Status::empty(MPI_Status * status) status->MPI_TAG = MPI_ANY_TAG; status->MPI_ERROR = MPI_SUCCESS; status->count=0; + status->cancelled=0; } } +int Status::cancelled(MPI_Status * status) +{ + return status->cancelled!=0; +} + int Status::get_count(MPI_Status * status, MPI_Datatype datatype) { return status->count / datatype->size();