From d672a6447aca931be2b670c5c5e177013513c7ee Mon Sep 17 00:00:00 2001 From: cristianrosa Date: Tue, 7 Dec 2010 14:02:54 +0000 Subject: [PATCH] Add support for custom communication matching to SIMIX network interface git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@9063 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- include/simix/simix.h | 5 +++- src/gras/Msg/sg_msg.c | 4 +-- src/gras/Transport/transport_plugin_sg.c | 4 +-- src/msg/gos.c | 4 +-- src/msg/msg_mailbox.c | 4 +-- src/simix/network_private.h | 6 ++-- src/simix/smurf_private.h | 5 +++- src/simix/smx_network.c | 37 ++++++++++++++---------- src/simix/smx_smurf.c | 5 +++- src/simix/smx_user.c | 9 ++++-- src/smpi/smpi_base.c | 4 +-- 11 files changed, 54 insertions(+), 33 deletions(-) diff --git a/include/simix/simix.h b/include/simix/simix.h index 8270b261b9..dfe2f250a9 100644 --- a/include/simix/simix.h +++ b/include/simix/simix.h @@ -164,10 +164,13 @@ XBT_PUBLIC(smx_action_t) SIMIX_req_rdv_get_head(smx_rdv_t rdv); XBT_PUBLIC(smx_action_t) SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate, void *src_buff, size_t src_buff_size, + int (*match_fun)(void *, void *), void *data); XBT_PUBLIC(smx_action_t) SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, - size_t * dst_buff_size); + size_t * dst_buff_size, + int (*match_fun)(void *, void *), + void *data); XBT_PUBLIC(void) SIMIX_req_comm_destroy(smx_action_t comm); diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index 1f586f36df..59d8cd07d5 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -183,7 +183,7 @@ gras_msg_t gras_msg_recv_any(void) sock_data->comm_recv = SIMIX_req_comm_irecv(gras_socket_im_the_server(sock) ? sock_data->rdv_server : sock_data->rdv_client, - NULL, 0); + NULL, 0, NULL, NULL); return msg; } @@ -241,7 +241,7 @@ void gras_msg_send_ext(gras_socket_t sock, payload, msg->payl); } - comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), msg); + comm = SIMIX_req_comm_isend(target_rdv, whole_payload_size, -1, &msg, sizeof(void *), NULL, msg); SIMIX_req_comm_wait(comm, -1); VERB0("Message sent (and received)"); diff --git a/src/gras/Transport/transport_plugin_sg.c b/src/gras/Transport/transport_plugin_sg.c index af76cb1e86..05590a6049 100644 --- a/src/gras/Transport/transport_plugin_sg.c +++ b/src/gras/Transport/transport_plugin_sg.c @@ -180,7 +180,7 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self, /* initialize synchronization stuff on the socket */ data->rdv_server = pr->rdv; data->rdv_client = SIMIX_req_rdv_create(NULL); - data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0); + data->comm_recv = SIMIX_req_comm_irecv(data->rdv_client, NULL, 0, NULL, NULL); /* connect that simulation data to the socket */ sock->data = data; @@ -227,7 +227,7 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self, int port, gras_socket_t s data->client = NULL; data->rdv_server = pr->rdv; data->rdv_client = NULL; - data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0); + data->comm_recv = SIMIX_req_comm_irecv(pr->rdv, NULL, 0, NULL, NULL); sock->data = data; diff --git a/src/msg/gos.c b/src/msg/gos.c index 899b135a5e..45fdce4126 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -414,7 +414,7 @@ msg_comm_t MSG_task_isend(m_task_t task, const char *alias) /* Send it by calling SIMIX network layer */ return SIMIX_req_comm_isend(mailbox, t_simdata->message_size, - t_simdata->rate, task, sizeof(void *), + t_simdata->rate, task, sizeof(void *), NULL, &t_simdata->comm); } @@ -444,7 +444,7 @@ msg_comm_t MSG_task_irecv(m_task_t * task, const char *alias) ("MSG_task_get() was asked to write in a non empty task struct."); /* Try to receive it by calling SIMIX network layer */ - return SIMIX_req_comm_irecv(rdv, task, NULL); + return SIMIX_req_comm_irecv(rdv, task, NULL, NULL, NULL); } /** \ingroup msg_gos_functions diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index ab9e1ec61d..dda4eb5c19 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -95,7 +95,7 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task, /* Try to receive it by calling SIMIX network layer */ TRY { - comm = SIMIX_req_comm_irecv(mailbox, task, NULL); + comm = SIMIX_req_comm_irecv(mailbox, task, NULL, NULL, NULL); SIMIX_req_comm_wait(comm, timeout); SIMIX_req_comm_destroy(comm); DEBUG2("Got task %s from %p",(*task)->name,mailbox); @@ -162,7 +162,7 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, /* Try to send it by calling SIMIX network layer */ TRY { t_simdata->comm = SIMIX_req_comm_isend(mailbox, t_simdata->message_size, - t_simdata->rate, task, sizeof(void *), task); + t_simdata->rate, task, sizeof(void *), NULL, task); #ifdef HAVE_TRACING SIMIX_req_set_category(t_simdata->comm, task->category); #endif diff --git a/src/simix/network_private.h b/src/simix/network_private.h index 721bc91c21..32b8777832 100644 --- a/src/simix/network_private.h +++ b/src/simix/network_private.h @@ -31,9 +31,11 @@ int SIMIX_rdv_comm_count_by_host(smx_rdv_t rdv, smx_host_t host); smx_action_t SIMIX_rdv_get_head(smx_rdv_t rdv); smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, double task_size, double rate, - void *src_buff, size_t src_buff_size, void *data); + void *src_buff, size_t src_buff_size, + int (*)(void *, void *), void *data); smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, - void *dst_buff, size_t *dst_buff_size); + void *dst_buff, size_t *dst_buff_size, + int (*)(void *, void *), void *data); void SIMIX_comm_destroy(smx_action_t action); void SIMIX_comm_destroy_internal_actions(smx_action_t action); void SIMIX_pre_comm_wait(smx_req_t req); diff --git a/src/simix/smurf_private.h b/src/simix/smurf_private.h index c031459bf7..8e6575399c 100644 --- a/src/simix/smurf_private.h +++ b/src/simix/smurf_private.h @@ -277,14 +277,17 @@ typedef struct s_smx_req { double rate; void *src_buff; size_t src_buff_size; + int (*match_fun)(void *, void *); void *data; - smx_action_t result; + smx_action_t result; } comm_isend; struct { smx_rdv_t rdv; void *dst_buff; size_t *dst_buff_size; + int (*match_fun)(void *, void *); + void *data; smx_action_t result; } comm_irecv; diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 47e52a0a5f..f874dcc1f5 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -23,7 +23,8 @@ static XBT_INLINE void SIMIX_comm_wait_for_completion(smx_action_t comm, double timeout); static XBT_INLINE void SIMIX_rdv_push(smx_rdv_t rdv, smx_action_t comm); static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm); -static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type); +static smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type, + int (*match_fun)(void *, void *), void *); static void SIMIX_rdv_free(void *data); void SIMIX_network_init(void) @@ -122,17 +123,19 @@ static XBT_INLINE void SIMIX_rdv_remove(smx_rdv_t rdv, smx_action_t comm) * \param type The type of communication we are looking for (comm_send, comm_recv) * \return The communication request if found, NULL otherwise */ -smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type) -{ - smx_action_t comm = (smx_action_t) - xbt_fifo_get_item_content(xbt_fifo_get_first_item(rdv->comm_fifo)); - - if (comm && comm->comm.type == type) { - DEBUG0("Communication request found!"); - xbt_fifo_shift(rdv->comm_fifo); - comm->comm.refcount++; - comm->comm.rdv = NULL; - return comm; +smx_action_t SIMIX_rdv_get_request(smx_rdv_t rdv, e_smx_comm_type_t type, + int (*match_fun)(void *, void *), void *data) +{ + smx_action_t req; + xbt_fifo_item_t item; + + xbt_fifo_foreach(rdv->comm_fifo, item, req, smx_action_t){ + if(req->comm.type == type && (!match_fun || match_fun(data, req->comm.data))){ + xbt_fifo_remove_item(rdv->comm_fifo, item); + req->comm.refcount++; + req->comm.rdv = NULL; + return req; + } } DEBUG0("Communication request not found"); @@ -232,13 +235,14 @@ void SIMIX_comm_destroy_internal_actions(smx_action_t action) smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, double task_size, double rate, - void *src_buff, size_t src_buff_size, void *data) + void *src_buff, size_t src_buff_size, + int (*match_fun)(void *, void *), void *data) { smx_action_t action; /* Look for communication request matching our needs. If it is not found then create it and push it into the rendez-vous point */ - action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE); + action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_RECEIVE, match_fun, data); if (!action) { action = SIMIX_comm_new(SIMIX_COMM_SEND); @@ -266,14 +270,15 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, } smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, - void *dst_buff, size_t *dst_buff_size) + void *dst_buff, size_t *dst_buff_size, + int (*match_fun)(void *, void *), void *data) { smx_action_t action; /* Look for communication request matching our needs. * If it is not found then create it and push it into the rendez-vous point */ - action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND); + action = SIMIX_rdv_get_request(rdv, SIMIX_COMM_SEND, match_fun, data); if (!action) { action = SIMIX_comm_new(SIMIX_COMM_RECEIVE); diff --git a/src/simix/smx_smurf.c b/src/simix/smx_smurf.c index b6c476d4a9..b01ce7b25e 100644 --- a/src/simix/smx_smurf.c +++ b/src/simix/smx_smurf.c @@ -311,6 +311,7 @@ void SIMIX_request_pre(smx_req_t req) req->comm_isend.rate, req->comm_isend.src_buff, req->comm_isend.src_buff_size, + req->comm_isend.match_fun, req->comm_isend.data); SIMIX_request_answer(req); break; @@ -320,7 +321,9 @@ void SIMIX_request_pre(smx_req_t req) req->issuer, req->comm_irecv.rdv, req->comm_irecv.dst_buff, - req->comm_irecv.dst_buff_size); + req->comm_irecv.dst_buff_size, + req->comm_irecv.match_fun, + req->comm_irecv.data); SIMIX_request_answer(req); break; diff --git a/src/simix/smx_user.c b/src/simix/smx_user.c index cb83fd97a4..1280e30074 100644 --- a/src/simix/smx_user.c +++ b/src/simix/smx_user.c @@ -597,7 +597,8 @@ smx_action_t SIMIX_req_rdv_get_head(smx_rdv_t rdv) } smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate, - void *src_buff, size_t src_buff_size, void *data) + void *src_buff, size_t src_buff_size, + int (*match_fun)(void *, void *), void *data) { s_smx_req_t req; @@ -609,13 +610,15 @@ smx_action_t SIMIX_req_comm_isend(smx_rdv_t rdv, double task_size, double rate, req.comm_isend.rate = rate; req.comm_isend.src_buff = src_buff; req.comm_isend.src_buff_size = src_buff_size; + req.comm_isend.match_fun = match_fun; req.comm_isend.data = data; SIMIX_request_push(&req); return req.comm_isend.result; } -smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size) +smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_buff_size, + int (*match_fun)(void *, void *), void *data) { s_smx_req_t req; @@ -625,6 +628,8 @@ smx_action_t SIMIX_req_comm_irecv(smx_rdv_t rdv, void *dst_buff, size_t * dst_bu req.comm_irecv.rdv = rdv; req.comm_irecv.dst_buff = dst_buff; req.comm_irecv.dst_buff_size = dst_buff_size; + req.comm_irecv.match_fun = match_fun; + req.comm_irecv.data = data; SIMIX_request_push(&req); return req.comm_irecv.result; diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index e1c5fef925..0581cfe2e2 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -73,13 +73,13 @@ void smpi_mpi_start(MPI_Request request) smpi_process_post_recv(request); print_request("New recv", request); request->pair = - SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size); + SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size, NULL, NULL); } else { smpi_process_post_send(request->comm, request); // FIXME print_request("New send", request); request->pair = SIMIX_req_comm_isend(request->rdv, request->size, -1.0, - request->buf, request->size, NULL); + request->buf, request->size, NULL, NULL); #ifdef HAVE_TRACING SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category()); #endif -- 2.20.1