From 15220d2486268d4eac7e25aac434a9139fb2a187 Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Thu, 20 Sep 2012 19:00:19 +0200 Subject: [PATCH] add MPI_Probe and MPI_Iprobe support, and better handling of the MPI_Status structure, which will need more testing --- include/simgrid/simix.h | 3 +- include/smpi/smpi.h | 10 +++--- src/simix/smx_network.c | 64 ++++++++++++++++++++++++++++++--- src/simix/smx_network_private.h | 2 ++ src/simix/smx_smurf.c | 11 ++++++ src/simix/smx_smurf_private.h | 10 ++++++ src/simix/smx_user.c | 24 +++++++++++++ src/smpi/private.h | 3 ++ src/smpi/smpi_base.c | 58 +++++++++++++++++++++++++++--- src/smpi/smpi_pmpi.c | 45 +++++++++++++++++++---- 10 files changed, 210 insertions(+), 20 deletions(-) diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 830b33d7d2..5fe3313051 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -396,7 +396,8 @@ XBT_PUBLIC(smx_action_t) simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff, void *data); XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm); - +XBT_PUBLIC(smx_action_t) simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag, + int (*match_fun)(void *, void *, smx_action_t), void *data); XBT_PUBLIC(void) simcall_comm_cancel(smx_action_t comm); /* FIXME: waitany is going to be a vararg function, and should take a timeout */ diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index e9b100c9b6..f7cad6029d 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -345,6 +345,12 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Alltoallv, void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm)); +MPI_CALL(XBT_PUBLIC(int), MPI_Iprobe, + (int source, int tag, MPI_Comm comm, + int* flag, MPI_Status* status)); +MPI_CALL(XBT_PUBLIC(int), MPI_Probe, + (int source, int tag, MPI_Comm comm, + MPI_Status* status)); //FIXME: these are not yet implemented typedef void MPI_Handler_function(MPI_Comm*, int*, ...); @@ -398,8 +404,6 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Ibsend, (void* buf, int count, MPI_Datatype dataty MPI_CALL(XBT_PUBLIC(int), MPI_Comm_remote_group, (MPI_Comm comm, MPI_Group* group)); MPI_CALL(XBT_PUBLIC(int), MPI_Comm_remote_size, (MPI_Comm comm, int* size)); MPI_CALL(XBT_PUBLIC(int), MPI_Issend, (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request)); -MPI_CALL(XBT_PUBLIC(int), MPI_Probe, (int source, int tag, MPI_Comm comm, MPI_Status* status)); -MPI_CALL(XBT_PUBLIC(int), MPI_Attr_delete, (MPI_Comm comm, int keyval)); MPI_CALL(XBT_PUBLIC(int), MPI_Attr_get, (MPI_Comm comm, int keyval, void* attr_value, int* flag)); MPI_CALL(XBT_PUBLIC(int), MPI_Attr_put, (MPI_Comm comm, int keyval, void* attr_value)); MPI_CALL(XBT_PUBLIC(int), MPI_Rsend, (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)); @@ -412,8 +416,6 @@ MPI_CALL(XBT_PUBLIC(int), MPI_Pack, (void* inbuf, int incount, MPI_Datatype type MPI_CALL(XBT_PUBLIC(int), MPI_Testall, (int count, MPI_Request* requests, int* flag, MPI_Status* statuses)); MPI_CALL(XBT_PUBLIC(int), MPI_Get_elements, (MPI_Status* status, MPI_Datatype datatype, int* elements)); MPI_CALL(XBT_PUBLIC(int), MPI_Dims_create, (int nnodes, int ndims, int* dims)); -MPI_CALL(XBT_PUBLIC(int), MPI_Iprobe, (int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status)); -MPI_CALL(XBT_PUBLIC(int), MPI_Initialized, (int* flag)); //FIXME: End of all the not yet implemented stuff // smpi functions diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index a247f086b1..85d4d52439 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -184,6 +184,40 @@ smx_action_t SIMIX_fifo_get_comm(xbt_fifo_t fifo, e_smx_comm_type_t type, } +/** + * \brief Checks if there is a communication action queued in a fifo matching our needs, but leave it there + * \param type The type of communication we are looking for (comm_send, comm_recv) + * \return The communication action if found, NULL otherwise + */ +smx_action_t SIMIX_fifo_probe_comm(xbt_fifo_t fifo, e_smx_comm_type_t type, + int (*match_fun)(void *, void *,smx_action_t), + void *this_user_data, smx_action_t my_action) +{ + smx_action_t action; + xbt_fifo_item_t item; + void* other_user_data = NULL; + + xbt_fifo_foreach(fifo, item, action, smx_action_t) { + if (action->comm.type == SIMIX_COMM_SEND) { + other_user_data = action->comm.src_data; + } else if (action->comm.type == SIMIX_COMM_RECEIVE) { + other_user_data = action->comm.dst_data; + } + if (action->comm.type == type && + (!match_fun || match_fun(this_user_data, other_user_data, action)) && + (!action->comm.match_fun || action->comm.match_fun(other_user_data, this_user_data, my_action))) { + XBT_DEBUG("Found a matching communication action %p", action); + action->comm.refcount++; + + return action; + } + XBT_DEBUG("Sorry, communication action %p does not match our needs:" + " its type is %d but we are looking for a comm of type %d (or maybe the filtering didn't match)", + action, (int)action->comm.type, (int)type); + } + XBT_DEBUG("No matching communication action found"); + return NULL; +} /******************************************************************************/ /* Communication Actions */ /******************************************************************************/ @@ -386,14 +420,14 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, other_action->state = SIMIX_DONE; other_action->comm.type = SIMIX_COMM_DONE; other_action->comm.rdv = NULL; - SIMIX_comm_destroy(this_action); - --smx_total_comms; // this creation was a pure waste + //SIMIX_comm_destroy(this_action); + //--smx_total_comms; // this creation was a pure waste //already_received=1; - other_action->comm.refcount--; + //other_action->comm.refcount--; }/*else{ XBT_DEBUG("Not yet finished, we have to wait %d\n", xbt_fifo_size(rdv->comm_fifo)); }*/ - other_action->comm.refcount--; + // other_action->comm.refcount--; SIMIX_comm_destroy(this_action); --smx_total_comms; // this creation was a pure waste } @@ -415,6 +449,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, --smx_total_comms; // this creation was a pure waste other_action->state = SIMIX_READY; other_action->comm.type = SIMIX_COMM_READY; + // other_action->comm.refcount--; } xbt_fifo_push(dst_proc->comms, other_action); } @@ -442,6 +477,27 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, return other_action; } + +smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src, + int tag, int (*match_fun)(void *, void *, smx_action_t), void *data) +{ + XBT_DEBUG("iprobe from %p %p\n", rdv, rdv->comm_fifo); + smx_action_t this_action = SIMIX_comm_new(SIMIX_COMM_RECEIVE); + + smx_action_t other_action; + if(rdv->permanent_receiver && xbt_fifo_size(rdv->done_comm_fifo)!=0){ + //find a match in the already received fifo + other_action = SIMIX_fifo_probe_comm(rdv->done_comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + }else{ + other_action = SIMIX_fifo_probe_comm(rdv->comm_fifo, SIMIX_COMM_SEND, match_fun, data, this_action); + } + if(other_action)other_action->comm.refcount--; + + SIMIX_comm_destroy(this_action); + --smx_total_comms; + return other_action; +} + void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx) { diff --git a/src/simix/smx_network_private.h b/src/simix/smx_network_private.h index 558aa16a0e..ee5d53b293 100644 --- a/src/simix/smx_network_private.h +++ b/src/simix/smx_network_private.h @@ -57,6 +57,8 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, void SIMIX_comm_destroy(smx_action_t action); void SIMIX_comm_destroy_internal_actions(smx_action_t action); void SIMIX_pre_comm_wait(smx_simcall_t simcall, smx_action_t action, double timeout, int idx); +smx_action_t SIMIX_comm_iprobe(smx_process_t dst_proc, smx_rdv_t rdv, int src, + int tag, int (*match_fun)(void *, void *, smx_action_t), void *data); void SIMIX_pre_comm_waitany(smx_simcall_t simcall, int idx); void SIMIX_post_comm(smx_action_t action); void SIMIX_pre_comm_test(smx_simcall_t simcall); diff --git a/src/simix/smx_smurf.c b/src/simix/smx_smurf.c index 4a679f81a6..0a8fbca3dd 100644 --- a/src/simix/smx_smurf.c +++ b/src/simix/smx_smurf.c @@ -123,6 +123,17 @@ void SIMIX_simcall_pre(smx_simcall_t simcall, int value) SIMIX_simcall_answer(simcall); break; + case SIMCALL_COMM_IPROBE: + simcall->comm_iprobe.result = SIMIX_comm_iprobe( + simcall->issuer, + simcall->comm_iprobe.rdv, + simcall->comm_iprobe.src, + simcall->comm_iprobe.tag, + simcall->comm_iprobe.match_fun, + simcall->comm_iprobe.data); + SIMIX_simcall_answer(simcall); + break; + case SIMCALL_COMM_DESTROY: SIMIX_comm_destroy(simcall->comm_destroy.comm); SIMIX_simcall_answer(simcall); diff --git a/src/simix/smx_smurf_private.h b/src/simix/smx_smurf_private.h index a269fd55d1..d1116506f7 100644 --- a/src/simix/smx_smurf_private.h +++ b/src/simix/smx_smurf_private.h @@ -65,6 +65,7 @@ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_WAITANY),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_WAIT),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_TEST),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_TESTANY),\ +SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_IPROBE),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_REMAINS),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_STATE),\ SIMCALL_ENUM_ELEMENT(SIMCALL_COMM_GET_SRC_DATA),\ @@ -391,6 +392,15 @@ typedef struct s_smx_simcall { smx_action_t result; } comm_irecv; + struct { + smx_rdv_t rdv; + int src; + int tag; + int (*match_fun)(void *, void *, smx_action_t); + void *data; + smx_action_t result; + } comm_iprobe; + struct { smx_action_t comm; } comm_destroy; diff --git a/src/simix/smx_user.c b/src/simix/smx_user.c index 8fb5b34e81..54edf9dc64 100644 --- a/src/simix/smx_user.c +++ b/src/simix/smx_user.c @@ -1021,6 +1021,30 @@ smx_action_t simcall_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff SIMIX_simcall_push(simcall->issuer); return simcall->comm_irecv.result; } + + +/** + * \ingroup simix_comm_management + */ +smx_action_t simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag, + int (*match_fun)(void *, void *, smx_action_t), void *data) +{ + xbt_assert(rdv, "No rendez-vous point defined for iprobe"); + + smx_simcall_t simcall = SIMIX_simcall_mine(); + + simcall->call = SIMCALL_COMM_IPROBE; + simcall->comm_iprobe.rdv = rdv; + simcall->comm_iprobe.src = src; + simcall->comm_iprobe.match_fun = match_fun; + simcall->comm_iprobe.data = data; + if(MC_IS_ENABLED) /* Initialize result to NULL for snapshot comparison done during simcall */ + simcall->comm_iprobe.result = NULL; + SIMIX_simcall_push(simcall->issuer); + return simcall->comm_iprobe.result; +} + + void simcall_comm_destroy(smx_action_t comm) { xbt_assert(comm, "Invalid parameter"); diff --git a/src/smpi/private.h b/src/smpi/private.h index aafbbae788..f32ddc6834 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -118,6 +118,9 @@ void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype, int smpi_mpi_test(MPI_Request * request, MPI_Status * status); int smpi_mpi_testany(int count, MPI_Request requests[], int *index, MPI_Status * status); +void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status); +MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, + MPI_Status* status); int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype); void smpi_mpi_wait(MPI_Request * request, MPI_Status * status); int smpi_mpi_waitany(int count, MPI_Request requests[], diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index ff3b8554cb..28566f5b11 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -147,10 +147,6 @@ void smpi_mpi_start(MPI_Request request) if (oldbuf) memcpy(request->buf,oldbuf,request->size); XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf); - }else{ - XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf); - mailbox = smpi_process_remote_mailbox( - smpi_group_index(smpi_comm_group(request->comm), request->dst)); } request->action = @@ -275,6 +271,10 @@ int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype) static void finish_wait(MPI_Request * request, MPI_Status * status) { MPI_Request req = *request; + // if we have a sender, we should use its data, and not the data from the receive + if((req->action)&& + (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG)) + req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action); if(status != MPI_STATUS_IGNORE) { status->MPI_SOURCE = req->src; @@ -284,6 +284,8 @@ static void finish_wait(MPI_Request * request, MPI_Status * status) // right? status->count = req->size; } + req = *request; + print_request("Finishing", req); if(req->flags & NON_PERSISTENT) { smpi_mpi_request_free(request); @@ -340,6 +342,54 @@ int smpi_mpi_testany(int count, MPI_Request requests[], int *index, return flag; } +void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){ + int flag=0; + //FIXME find another wait to avoid busy waiting ? + // the issue here is that we have to wait on a nonexistent comm + MPI_Request request; + while(flag==0){ + request = smpi_mpi_iprobe(source, tag, comm, &flag, status); + XBT_DEBUG("Busy Waiting on probing : %d", flag); + if(!flag) { + smpi_mpi_request_free(&request); + simcall_process_sleep(0.0001); + } + } +} + +MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){ + MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag, + comm, NON_PERSISTENT | RECV); + // behave like a receive, but don't do it + smx_rdv_t mailbox; + + print_request("New iprobe", request); + // We have to test both mailboxes as we don't know if we will receive one one or another + if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){ + mailbox = smpi_process_mailbox_small(); + request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request); + + } + if (request->action==NULL){ + mailbox = smpi_process_mailbox(); + request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request); + } + + if(request->action){ + MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action); + *flag=true; + if(status != MPI_STATUS_IGNORE) { + status->MPI_SOURCE = req->src; + status->MPI_TAG = req->tag; + status->MPI_ERROR = MPI_SUCCESS; + status->count = req->size; + } + } + else *flag=false; + + return request; +} + void smpi_mpi_wait(MPI_Request * request, MPI_Status * status) { print_request("Waiting", *request); diff --git a/src/smpi/smpi_pmpi.c b/src/smpi/smpi_pmpi.c index 2f6a8c1158..5079400097 100644 --- a/src/smpi/smpi_pmpi.c +++ b/src/smpi/smpi_pmpi.c @@ -1096,6 +1096,43 @@ int PMPI_Testany(int count, MPI_Request requests[], int *index, int *flag, return retval; } + + +int PMPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status) { + int retval; + smpi_bench_end(); + + if (status == NULL) { + retval = MPI_ERR_ARG; + }else if (comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else { + smpi_mpi_probe(source, tag, comm, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} + + +int PMPI_Iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status) { + int retval; + smpi_bench_end(); + + if (flag == NULL) { + retval = MPI_ERR_ARG; + }else if (status == NULL) { + retval = MPI_ERR_ARG; + }else if (comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else { + smpi_mpi_iprobe(source, tag, comm, flag, status); + retval = MPI_SUCCESS; + } + smpi_bench_begin(); + return retval; +} + int PMPI_Wait(MPI_Request * request, MPI_Status * status) { int retval; @@ -1942,9 +1979,7 @@ int PMPI_Issend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, return not_yet_implemented(); } -int PMPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status) { - return not_yet_implemented(); -} + int PMPI_Attr_delete(MPI_Comm comm, int keyval) { return not_yet_implemented(); @@ -1998,10 +2033,6 @@ int PMPI_Dims_create(int nnodes, int ndims, int* dims) { return not_yet_implemented(); } -int PMPI_Iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status) { - return not_yet_implemented(); -} - int PMPI_Initialized(int* flag) { return not_yet_implemented(); } -- 2.20.1