From 16fdb8c65ac8d5baebc0278c499546c5fb10207b Mon Sep 17 00:00:00 2001 From: mquinson Date: Tue, 25 Oct 2005 19:59:11 +0000 Subject: [PATCH] New function: gras_msg_wait_ext (for a finer control of accepted messages); introduce the message kind concept. Not intended for API exposure, but makes room for a proper support of RPC in gras git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1835 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/gras/Msg/msg.c | 124 +++++++++++++++++++++++++------------ src/gras/Msg/msg_private.h | 35 ++++++++--- src/gras/Msg/rl_msg.c | 38 ++++++------ src/gras/Msg/sg_msg.c | 37 ++++++----- 4 files changed, 151 insertions(+), 83 deletions(-) diff --git a/src/gras/Msg/msg.c b/src/gras/Msg/msg.c index 6281d3922f..a27a327eea 100644 --- a/src/gras/Msg/msg.c +++ b/src/gras/Msg/msg.c @@ -205,30 +205,32 @@ gras_msgtype_t gras_msgtype_by_id(int id) { /** \brief Waits for a message to come in over a given socket. * * @param timeout: How long should we wait for this message. - * @param msgt_want: type of awaited msg - * @param[out] expeditor: where to create a socket to answer the incomming message - * @param[out] payload: where to write the payload of the incomming message - * @return the error code (or no_error). + * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message) + * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant) + * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant) + * @param filter_ctx: context passed as second argument of the filter (a pattern to match?) + * @param[out] msgt_got: where to write the descriptor of the message we got + * @param[out] expe_got: where to create a socket to answer the incomming message + * @param[out] payl_got: where to write the payload of the incomming message * * Every message of another type received before the one waited will be queued * and used by subsequent call to this function or gras_msg_handle(). */ + void -gras_msg_wait(double timeout, - gras_msgtype_t msgt_want, - gras_socket_t *expeditor, - void *payload) { +gras_msg_wait_ext(double timeout, + gras_msgtype_t msgt_want, + gras_socket_t expe_want, + int_f_pvoid_pvoid_t payl_filter, + void *filter_ctx, + gras_msgtype_t *msgt_got, + gras_socket_t *expe_got, + void *payl_got) { - gras_msgtype_t msgt_got; - void *payload_got; - int payload_size_got; + s_gras_msg_t msg; double start, now; gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id); int cpt; - s_gras_msg_t msg; - gras_socket_t expeditor_res = NULL; - - payload_got = NULL; xbt_assert0(msgt_want,"Cannot wait for the NULL message"); @@ -237,11 +239,18 @@ gras_msg_wait(double timeout, start = now = gras_os_time(); xbt_dynar_foreach(pd->msg_queue,cpt,msg){ - if (msg.type->code == msgt_want->code) { - if (expeditor) - *expeditor = msg.expeditor; - memcpy(payload, msg.payload, msg.payload_size); - free(msg.payload); + if ( ( !msgt_want || (msg.type->code == msgt_want->code)) + && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe), + gras_socket_peer_name(expe_want)))) + && (!payl_filter || payl_filter(msg.payl,filter_ctx))) { + + if (expe_got) + *expe_got = msg.expe; + if (msgt_got) + *msgt_got = msg.type; + if (payl_got) + memcpy(payl_got, msg.payl, msg.payl_size); + free(msg.payl); xbt_dynar_cursor_rm(pd->msg_queue, &cpt); VERB0("The waited message was queued"); return; @@ -249,22 +258,27 @@ gras_msg_wait(double timeout, } while (1) { - expeditor_res = gras_trp_select(timeout - now + start); - gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got); - if (msgt_got->code == msgt_want->code) { - if (expeditor) - *expeditor=expeditor_res; - memcpy(payload, payload_got, payload_size_got); - free(payload_got); - VERB0("Got waited message"); + memset(&msg,sizeof(msg),0); + + msg.expe = gras_trp_select(timeout - now + start); + gras_msg_recv(msg.expe, &msg); + + if ( ( !msgt_want || (msg.type->code == msgt_want->code)) + && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe), + gras_socket_peer_name(expe_want)))) + && (!payl_filter || payl_filter(msg.payl,filter_ctx))) { + + if (expe_got) + *expe_got=msg.expe; + if (msgt_got) + *msgt_got = msg.type; + if (payl_got) + memcpy(payl_got, msg.payl, msg.payl_size); + free(msg.payl); return; } /* not expected msg type. Queue it for later */ - msg.expeditor = expeditor_res; - msg.type = msgt_got; - msg.payload = payload; - msg.payload_size = payload_size_got; xbt_dynar_push(pd->msg_queue,&msg); now=gras_os_time(); @@ -276,6 +290,38 @@ gras_msg_wait(double timeout, THROW_IMPOSSIBLE; } +/** \brief Waits for a message to come in over a given socket. + * + * @param timeout: How long should we wait for this message. + * @param msgt_want: type of awaited msg + * @param[out] expeditor: where to create a socket to answer the incomming message + * @param[out] payload: where to write the payload of the incomming message + * @return the error code (or no_error). + * + * Every message of another type received before the one waited will be queued + * and used by subsequent call to this function or gras_msg_handle(). + */ +void +gras_msg_wait(double timeout, + gras_msgtype_t msgt_want, + gras_socket_t *expeditor, + void *payload) { + + return gras_msg_wait_ext(timeout, + msgt_want, NULL, NULL, NULL, + NULL, expeditor, payload); +} + + +/** \brief Send the data pointed by \a payload as a message of type + * \a msgtype to the peer \a sock */ +void +gras_msg_send(gras_socket_t sock, + gras_msgtype_t msgtype, + void *payload) { + + gras_msg_send_ext(sock, e_gras_msg_kind_oneway, msgtype, payload); +} /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds) * @@ -294,8 +340,7 @@ gras_msg_handle(double timeOut) { s_gras_msg_t msg; gras_socket_t expeditor=NULL; void *payload=NULL; - int payload_size; - gras_msgtype_t msgtype; + gras_msgtype_t msgtype=NULL; gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id); gras_cblist_t *list=NULL; @@ -324,9 +369,9 @@ gras_msg_handle(double timeOut) { if (xbt_dynar_length(pd->msg_queue)) { DEBUG0("Get a message from the queue"); xbt_dynar_shift(pd->msg_queue,&msg); - expeditor = msg.expeditor; + expeditor = msg.expe; msgtype = msg.type; - payload = msg.payload; + payload = msg.payl; } else { TRY { expeditor = gras_trp_select(timeOut); @@ -339,9 +384,12 @@ gras_msg_handle(double timeOut) { if (!timeouted) { TRY { - gras_msg_recv(expeditor, &msgtype, &payload, &payload_size); + /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */ + gras_msg_recv(expeditor, &msg); + msgtype = msg.type; + payload = msg.payl; } CATCH(e) { - RETHROW1("Error caught while receiving a message on select()ed socket %p: %s", + RETHROW1("Error caught while receiving a message on select()ed socket %p: %s", expeditor); } } diff --git a/src/gras/Msg/msg_private.h b/src/gras/Msg/msg_private.h index 20f28c17c3..a14458081b 100644 --- a/src/gras/Msg/msg_private.h +++ b/src/gras/Msg/msg_private.h @@ -32,12 +32,28 @@ extern char _GRAS_header[6]; extern int gras_msg_libdata_id; /* The identifier of our libdata */ +typedef enum { + e_gras_msg_kind_unknown = 0, + e_gras_msg_kind_oneway = 1 + /* future: + method call (answer expected; sessionID attached) + successful return (usual datatype attached, with sessionID) + error return (payload = exception) + [+ call cancel, and others] + even after: + forwarding request and other application level routing stuff + group communication + */ + +} e_gras_msg_kind_t; + /** @brief Message instance */ typedef struct { - gras_socket_t expeditor; - gras_msgtype_t type; - void *payload; - int payload_size; + gras_socket_t expe; + e_gras_msg_kind_t kind; + gras_msgtype_t type; + void *payl; + int payl_size; } s_gras_msg_t, *gras_msg_t; /** @@ -60,10 +76,13 @@ extern xbt_set_t _gras_msgtype_set; /* of gras_msgtype_t */ void gras_msgtype_free(void *msgtype); -void gras_msg_recv(gras_socket_t sock, - gras_msgtype_t *msgtype, - void **payload, - int *payload_size); +/* functions to extract msg from socket or put it on wire (depend RL vs SG) */ +void gras_msg_recv(gras_socket_t sock, + gras_msg_t msg/*OUT*/); +void gras_msg_send_ext(gras_socket_t sock, + e_gras_msg_kind_t kind, + gras_msgtype_t msgtype, + void *payload); /** * gras_cblist_t: diff --git a/src/gras/Msg/rl_msg.c b/src/gras/Msg/rl_msg.c index 61405d0425..bfbd5e4b15 100644 --- a/src/gras/Msg/rl_msg.c +++ b/src/gras/Msg/rl_msg.c @@ -16,15 +16,14 @@ XBT_LOG_EXTERNAL_CATEGORY(gras_msg); XBT_LOG_DEFAULT_CATEGORY(gras_msg); -/** \brief Send the data pointed by \a payload as a message of type - * \a msgtype to the peer \a sock */ -void -gras_msg_send(gras_socket_t sock, - gras_msgtype_t msgtype, - void *payload) { +void gras_msg_send_ext(gras_socket_t sock, + e_gras_msg_kind_t kind, + gras_msgtype_t msgtype, + void *payload) { static gras_datadesc_type_t string_type=NULL; - + char c_kind=(char)kind; + if (!msgtype) THROW0(arg_error,0, "Cannot send the NULL message (did msgtype_by_name fail?)"); @@ -37,20 +36,20 @@ gras_msg_send(gras_socket_t sock, DEBUG3("send '%s' to %s:%d", msgtype->name, gras_socket_peer_name(sock),gras_socket_peer_port(sock)); gras_trp_send(sock, _GRAS_header, 6, 1 /* stable */); + gras_trp_send(sock, &c_kind, 1, 1 /* stable */); gras_datadesc_send(sock, string_type, &msgtype->name); if (msgtype->ctn_type) gras_datadesc_send(sock, msgtype->ctn_type, payload); gras_trp_flush(sock); } + /* * receive the next message on the given socket. */ void gras_msg_recv(gras_socket_t sock, - gras_msgtype_t *msgtype, - void **payload, - int *payload_size) { + gras_msg_t msg) { xbt_ex_t e; static gras_datadesc_type_t string_type=NULL; @@ -58,6 +57,7 @@ gras_msg_recv(gras_socket_t sock, int cpt; int r_arch; char *msg_name=NULL; + char c_kind; xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", sock); @@ -68,6 +68,8 @@ gras_msg_recv(gras_socket_t sock, TRY { gras_trp_recv(sock, header, 6); + gras_trp_recv(sock, &c_kind, 1); + msg->kind=(e_gras_msg_kind_t)c_kind; } CATCH(e) { RETHROW1("Exception caught while trying to get the mesage header on socket %p: %s", sock); @@ -86,7 +88,7 @@ gras_msg_recv(gras_socket_t sock, gras_datadesc_recv(sock, string_type, r_arch, &msg_name); TRY { - *msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,msg_name); + msg->type = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,msg_name); } CATCH(e) { /* FIXME: Survive unknown messages */ RETHROW1("Exception caught while retrieving the type associated to messages '%s' : %s", @@ -94,16 +96,16 @@ gras_msg_recv(gras_socket_t sock, } free(msg_name); - if ((*msgtype)->ctn_type) { - *payload_size=gras_datadesc_size((*msgtype)->ctn_type); - xbt_assert2(*payload_size > 0, + if (msg->type->ctn_type) { + msg->payl_size=gras_datadesc_size(msg->type->ctn_type); + xbt_assert2(msg->payl_size > 0, "%s %s", "Dynamic array as payload is forbided for now (FIXME?).", "Reference to dynamic array is allowed."); - *payload = xbt_malloc(*payload_size); - gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload); + msg->payl = xbt_malloc(msg->payl_size); + gras_datadesc_recv(sock, msg->type->ctn_type, r_arch, msg->payl); } else { - *payload = NULL; - *payload_size = 0; + msg->payl = NULL; + msg->payl_size = 0; } } diff --git a/src/gras/Msg/sg_msg.c b/src/gras/Msg/sg_msg.c index adaa618c15..93a9ae18d3 100644 --- a/src/gras/Msg/sg_msg.c +++ b/src/gras/Msg/sg_msg.c @@ -17,12 +17,10 @@ #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */ #include "gras/Transport/transport_private.h" /* sock->data */ -/** \brief Send the data pointed by \a payload as a message of type - * \a msgtype to the peer \a sock */ -void -gras_msg_send(gras_socket_t sock, - gras_msgtype_t msgtype, - void *payload) { +void gras_msg_send_ext(gras_socket_t sock, + e_gras_msg_kind_t kind, + gras_msgtype_t msgtype, + void *payload) { m_task_t task=NULL; gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data; @@ -37,13 +35,14 @@ gras_msg_send(gras_socket_t sock, msg->type=msgtype; - msg->payload_size=gras_datadesc_size(msgtype->ctn_type); - msg->payload=xbt_malloc(gras_datadesc_size(msgtype->ctn_type)); + msg->payl_size=gras_datadesc_size(msgtype->ctn_type); + msg->payl=xbt_malloc(msg->payl_size); if (msgtype->ctn_type) - whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,payload,msg->payload); + whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,payload,msg->payl); + msg->kind = kind; task=MSG_task_create(msgtype->name,0, - ((double)msg->payload_size)/(1024.0*1024.0),msg); + ((double)whole_payload_size)/(1024.0*1024.0),msg); if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) THROW0(system_error,0,"Problem during the MSG_task_put"); @@ -54,26 +53,26 @@ gras_msg_send(gras_socket_t sock, */ void gras_msg_recv(gras_socket_t sock, - gras_msgtype_t *msgtype, - void **payload, - int *payload_size) { + gras_msg_t msg) { m_task_t task=NULL; - gras_msg_t msg; + gras_msg_t msg_got; gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp"); xbt_assert1(!gras_socket_is_meas(sock), "Asked to receive a message on the measurement socket %p", sock); + xbt_assert0(msg,"msg is an out parameter of gras_msg_recv..."); + if (MSG_task_get(&task, pd->chan) != MSG_OK) THROW0(system_error,0,"Error in MSG_task_get()"); - msg = MSG_task_get_data(task); - *msgtype = gras_msgtype_by_id(msg->type->code); - *payload = msg->payload; - *payload_size = msg->payload_size; + msg_got=MSG_task_get_data(task); + + msg_got->expe= msg->expe; + memcpy(msg,msg_got,sizeof(s_gras_msg_t)); - free(msg); + free(msg_got); if (MSG_task_destroy(task) != MSG_OK) THROW0(system_error,0,"Error in MSG_task_destroy()"); } -- 2.20.1