X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/0ca1291c2f63ca6235f4b4885c48413a900aade4..de05fff4ce289c10aea0f0c80476809e4091a247:/src/gras/Msg/gras_msg_exchange.c diff --git a/src/gras/Msg/gras_msg_exchange.c b/src/gras/Msg/gras_msg_exchange.c index e1577320ac..34915867ef 100644 --- a/src/gras/Msg/gras_msg_exchange.c +++ b/src/gras/Msg/gras_msg_exchange.c @@ -8,6 +8,7 @@ #include "xbt/ex.h" #include "xbt/ex_interface.h" +#include "xbt/socket.h" #include "gras/Msg/msg_private.h" #include "gras/Virtu/virtu_interface.h" @@ -35,7 +36,7 @@ const char *e_gras_msg_kind_names[e_gras_msg_kind_count] = void gras_msg_wait_ext_(double timeout, gras_msgtype_t msgt_want, - gras_socket_t expe_want, + xbt_socket_t expe_want, gras_msg_filter_t filter, void *filter_ctx, gras_msg_t msg_got) { @@ -54,8 +55,8 @@ gras_msg_wait_ext_(double timeout, xbt_dynar_foreach(pd->msg_waitqueue, cpt, msg) { if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); @@ -67,8 +68,8 @@ gras_msg_wait_ext_(double timeout, xbt_dynar_foreach(pd->msg_queue, cpt, msg) { if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); @@ -107,8 +108,8 @@ gras_msg_wait_ext_(double timeout, XBT_DEBUG("Got a message from the socket"); if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); @@ -135,8 +136,8 @@ gras_msg_wait_ext_(double timeout, * * @param timeout: How long should we wait for this message. * @param msgt_want: type of awaited msg - * @param[out] expeditor: where to create a socket to answer the incomming message - * @param[out] payload: where to write the payload of the incomming message + * @param[out] expeditor: where to create a socket to answer the incoming message + * @param[out] payload: where to write the payload of the incoming message * @return the error code (or no_error). * * Every message of another type received before the one waited will be queued @@ -145,7 +146,7 @@ gras_msg_wait_ext_(double timeout, void gras_msg_wait_(double timeout, gras_msgtype_t msgt_want, - gras_socket_t * expeditor, void *payload) + xbt_socket_t * expeditor, void *payload) { s_gras_msg_t msg; @@ -187,7 +188,7 @@ static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) * @param msgt_want: a dynar containing all accepted message type * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to) * @param[out] msgt_got: indice in the dynar of the type of the received message - * @param[out] payload: where to write the payload of the incomming message + * @param[out] payload: where to write the payload of the incoming message * @return the error code (or no_error). * * Every message of a type not in the accepted list received before the one @@ -230,7 +231,7 @@ void gras_msg_wait_or(double timeout, /** \brief Send the data pointed by \a payload as a message of type * \a msgtype to the peer \a sock */ -void gras_msg_send_(gras_socket_t sock, gras_msgtype_t msgtype, +void gras_msg_send_(xbt_socket_t sock, gras_msgtype_t msgtype, void *payload) { @@ -281,7 +282,7 @@ void gras_msg_handleall(double period) } while (period - now + begin > 0); } -/** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds) +/** @brief Handle an incoming message or timer (or wait up to \a timeOut seconds) * * @param timeOut: How long to wait for incoming messages (in seconds) * @return the error code (or no_error). @@ -290,13 +291,13 @@ void gras_msg_handleall(double period) * * @sa gras_msg_handleall(). */ -void gras_msg_handle(double timeOut) +void gras_msg_handle(volatile double timeOut) { double untiltimer; unsigned int cpt; - int volatile ran_ok; + volatile int ran_ok; s_gras_msg_t msg; @@ -306,7 +307,7 @@ void gras_msg_handle(double timeOut) gras_msg_cb_t cb; s_gras_msg_cb_ctx_t ctx; - int timerexpected, timeouted; + volatile int timerexpected, timeouted; xbt_ex_t e; XBT_VERB("Handling message within the next %.2fs", timeOut); @@ -326,7 +327,7 @@ void gras_msg_handle(double timeOut) /* get a message (from the queue or from the net) */ timeouted = 0; - if (xbt_dynar_length(pd->msg_queue)) { + if (!xbt_dynar_is_empty(pd->msg_queue)) { XBT_DEBUG("Get a message from the queue"); xbt_dynar_shift(pd->msg_queue, &msg); } else { @@ -382,7 +383,7 @@ void gras_msg_handle(double timeOut) XBT_INFO ("No callback for message '%s' (type:%s) from %s:%d. Queue it for later gras_msg_wait() use.", msg.type->name, e_gras_msg_kind_names[msg.kind], - gras_socket_peer_name(msg.expe), gras_socket_peer_port(msg.expe)); + xbt_socket_peer_name(msg.expe), xbt_socket_peer_port(msg.expe)); xbt_dynar_push(pd->msg_waitqueue, &msg); return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */ } @@ -398,16 +399,18 @@ void gras_msg_handle(double timeOut) ran_ok = 0; TRY { xbt_dynar_foreach(list->cbs, cpt, cb) { + volatile unsigned int cpt2 = cpt; if (!ran_ok) { XBT_DEBUG - ("Use the callback #%d (@%p) for incomming msg '%s' (payload_size=%d)", + ("Use the callback #%u (@%p) for incoming msg '%s' (payload_size=%d)", cpt + 1, cb, msg.type->name, msg.payl_size); - if (!(*cb) (&ctx, msg.payl)) { + if (!cb(&ctx, msg.payl)) { /* cb handled the message */ free(msg.payl); ran_ok = 1; } } + cpt = cpt2; } } CATCH(e) { @@ -424,11 +427,11 @@ void gras_msg_handle(double timeOut) e.host = (char *) gras_os_myname(); xbt_ex_setup_backtrace(&e); } - XBT_INFO + XBT_VERB ("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d", (e.remote ? "remote" : "local"), e.msg, msg.type->name, - gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); + xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); if (XBT_LOG_ISENABLED(gras_msg, xbt_log_priority_verbose)) xbt_ex_display(&e); gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror, @@ -439,7 +442,7 @@ void gras_msg_handle(double timeOut) ran_ok = 1; } else { RETHROWF - ("Callback #%d (@%p) to message '%s' (payload size: %d) raised an exception: %s", + ("Callback #%u (@%p) to message '%s' (payload size: %d) raised an exception: %s", cpt + 1, cb, msg.type->name, msg.payl_size); } } @@ -456,29 +459,29 @@ void gras_msg_handle(double timeOut) THROWF(mismatch_error, 0, "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)", msg.type->name); - /* FIXME: gras_datadesc_free not implemented => leaking the payload */ + /* FIXME: xbt_datadesc_free not implemented => leaking the payload */ break; case e_gras_msg_kind_rpcanswer: XBT_INFO("Unexpected RPC answer discarded (type: %s; from:%s:%d)", - msg.type->name, gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); + msg.type->name, xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); XBT_WARN - ("FIXME: gras_datadesc_free not implemented => leaking the payload"); + ("FIXME: xbt_datadesc_free not implemented => leaking the payload"); return; case e_gras_msg_kind_rpcerror: XBT_INFO("Unexpected RPC error discarded (type: %s; from:%s:%d)", - msg.type->name, gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); + msg.type->name, xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); XBT_WARN - ("FIXME: gras_datadesc_free not implemented => leaking the payload"); + ("FIXME: xbt_datadesc_free not implemented => leaking the payload"); return; default: THROWF(unknown_error, 0, - "Cannot handle messages of kind %d yet", msg.type->kind); + "Cannot handle messages of kind %d yet", (int)msg.type->kind); } }