X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6760cb07d6b57be16928d95339d71e57c4e24f36..c91b68ade95e42efb7a24f19fb5228bee0b618d0:/src/gras/Msg/gras_msg_exchange.c?ds=sidebyside diff --git a/src/gras/Msg/gras_msg_exchange.c b/src/gras/Msg/gras_msg_exchange.c index ed0b3ddc63..34915867ef 100644 --- a/src/gras/Msg/gras_msg_exchange.c +++ b/src/gras/Msg/gras_msg_exchange.c @@ -8,6 +8,7 @@ #include "xbt/ex.h" #include "xbt/ex_interface.h" +#include "xbt/socket.h" #include "gras/Msg/msg_private.h" #include "gras/Virtu/virtu_interface.h" @@ -16,7 +17,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg); char _GRAS_header[6]; const char *e_gras_msg_kind_names[e_gras_msg_kind_count] = - { "UNKNOWN", "ONEWAY", "RPC call", "RPC answer", "RPC error" }; + { "UNKNOWN", "ONEWAY", "RPC call", "RPC answer", "RPC error" }; /** \brief Waits for a message to come in over a given socket. @@ -35,7 +36,7 @@ const char *e_gras_msg_kind_names[e_gras_msg_kind_count] = void gras_msg_wait_ext_(double timeout, gras_msgtype_t msgt_want, - gras_socket_t expe_want, + xbt_socket_t expe_want, gras_msg_filter_t filter, void *filter_ctx, gras_msg_t msg_got) { @@ -43,37 +44,37 @@ gras_msg_wait_ext_(double timeout, s_gras_msg_t msg; double start, now; gras_msg_procdata_t pd = - (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id); + (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id); unsigned int cpt; - xbt_assert0(msg_got, "msg_got is an output parameter"); + xbt_assert(msg_got, "msg_got is an output parameter"); start = gras_os_time(); - VERB2("Waiting for message '%s' for %fs", + XBT_VERB("Waiting for message '%s' for %fs", msgt_want ? msgt_want->name : "(any)", timeout); xbt_dynar_foreach(pd->msg_waitqueue, cpt, msg) { if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); xbt_dynar_cursor_rm(pd->msg_waitqueue, &cpt); - VERB0("The waited message was queued"); + XBT_VERB("The waited message was queued"); return; } } xbt_dynar_foreach(pd->msg_queue, cpt, msg) { if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); xbt_dynar_cursor_rm(pd->msg_queue, &cpt); - VERB0("The waited message was queued"); + XBT_VERB("The waited message was queued"); return; } } @@ -104,25 +105,25 @@ gras_msg_wait_ext_(double timeout, if (need_restart) goto restart_receive; - DEBUG0("Got a message from the socket"); + XBT_DEBUG("Got a message from the socket"); if ((!msgt_want || (msg.type->code == msgt_want->code)) - && (!expe_want || (!strcmp(gras_socket_peer_name(msg.expe), - gras_socket_peer_name(expe_want)))) + && (!expe_want || (!strcmp(xbt_socket_peer_name(msg.expe), + xbt_socket_peer_name(expe_want)))) && (!filter || filter(&msg, filter_ctx))) { memcpy(msg_got, &msg, sizeof(s_gras_msg_t)); - DEBUG0("Message matches expectations. Use it."); + XBT_DEBUG("Message matches expectations. Use it."); return; } - DEBUG0("Message does not match expectations. Queue it."); + XBT_DEBUG("Message does not match expectations. Queue it."); /* not expected msg type. Queue it for later */ xbt_dynar_push(pd->msg_queue, &msg); now = gras_os_time(); if (now - start + 0.001 > timeout) { - THROW1(timeout_error, now - start + 0.001 - timeout, + THROWF(timeout_error, now - start + 0.001 - timeout, "Timeout while waiting for msg '%s'", msgt_want ? msgt_want->name : "(any)"); } @@ -135,8 +136,8 @@ gras_msg_wait_ext_(double timeout, * * @param timeout: How long should we wait for this message. * @param msgt_want: type of awaited msg - * @param[out] expeditor: where to create a socket to answer the incomming message - * @param[out] payload: where to write the payload of the incomming message + * @param[out] expeditor: where to create a socket to answer the incoming message + * @param[out] payload: where to write the payload of the incoming message * @return the error code (or no_error). * * Every message of another type received before the one waited will be queued @@ -145,19 +146,19 @@ gras_msg_wait_ext_(double timeout, void gras_msg_wait_(double timeout, gras_msgtype_t msgt_want, - gras_socket_t * expeditor, void *payload) + xbt_socket_t * expeditor, void *payload) { s_gras_msg_t msg; gras_msg_wait_ext_(timeout, msgt_want, NULL, NULL, NULL, &msg); if (msgt_want->ctn_type) { - xbt_assert1(payload, - "Message type '%s' convey a payload you must accept", + xbt_assert(payload, + "Message type '%s' convey a payload that you must accept", msgt_want->name); } else { - xbt_assert1(!payload, - "No payload was declared for message type '%s'", + xbt_assert(!payload, + "No payload was declared for message type '%s' (don't expect one)", msgt_want->name); } @@ -175,9 +176,9 @@ static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) xbt_dynar_t dyn = (xbt_dynar_t) ctx; int res = xbt_dynar_member(dyn, msg->type); if (res) - VERB1("Got matching message (type=%s)", msg->type->name); + XBT_VERB("Got matching message (type=%s)", msg->type->name); else - VERB0("Got message not matching our expectations"); + XBT_VERB("Got message not matching our expectations"); return res; } @@ -187,7 +188,7 @@ static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) * @param msgt_want: a dynar containing all accepted message type * @param[out] ctx: the context of received message (in case it's a RPC call we want to answer to) * @param[out] msgt_got: indice in the dynar of the type of the received message - * @param[out] payload: where to write the payload of the incomming message + * @param[out] payload: where to write the payload of the incoming message * @return the error code (or no_error). * * Every message of a type not in the accepted list received before the one @@ -198,17 +199,18 @@ static int gras_msg_wait_or_filter(gras_msg_t msg, void *ctx) */ void gras_msg_wait_or(double timeout, xbt_dynar_t msgt_want, - gras_msg_cb_ctx_t * ctx, int *msgt_got, void *payload) + gras_msg_cb_ctx_t * ctx, int *msgt_got, + void *payload) { s_gras_msg_t msg; - VERB1("Wait %f seconds for several message types", timeout); + XBT_VERB("Wait %f seconds for several message types", timeout); gras_msg_wait_ext_(timeout, NULL, NULL, &gras_msg_wait_or_filter, (void *) msgt_want, &msg); if (msg.type->ctn_type) { - xbt_assert1(payload, + xbt_assert(payload, "Message type '%s' convey a payload you must accept", msg.type->name); } @@ -229,23 +231,24 @@ void gras_msg_wait_or(double timeout, /** \brief Send the data pointed by \a payload as a message of type * \a msgtype to the peer \a sock */ -void gras_msg_send_(gras_socket_t sock, gras_msgtype_t msgtype, void *payload) +void gras_msg_send_(xbt_socket_t sock, gras_msgtype_t msgtype, + void *payload) { if (msgtype->ctn_type) { - xbt_assert1(payload, + xbt_assert(payload, "Message type '%s' convey a payload you must provide", msgtype->name); } else { - xbt_assert1(!payload, + xbt_assert(!payload, "No payload was declared for message type '%s'", msgtype->name); } - DEBUG2("Send a oneway message of type '%s'. Payload=%p", + XBT_DEBUG("Send a oneway message of type '%s'. Payload=%p", msgtype->name, payload); gras_msg_send_ext(sock, e_gras_msg_kind_oneway, 0, msgtype, payload); - VERB2("Sent a oneway message of type '%s'. Payload=%p", + XBT_VERB("Sent a oneway message of type '%s'. Payload=%p", msgtype->name, payload); } @@ -272,14 +275,14 @@ void gras_msg_handleall(double period) } CATCH(e) { if (e.category != timeout_error) - RETHROW0("Error while waiting for messages: %s"); + RETHROWF("Error while waiting for messages: %s"); xbt_ex_free(e); } /* Epsilon to avoid numerical stability issues were the waited interval is so small that the global clock cannot notice the increment */ } while (period - now + begin > 0); } -/** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds) +/** @brief Handle an incoming message or timer (or wait up to \a timeOut seconds) * * @param timeOut: How long to wait for incoming messages (in seconds) * @return the error code (or no_error). @@ -288,29 +291,29 @@ void gras_msg_handleall(double period) * * @sa gras_msg_handleall(). */ -void gras_msg_handle(double timeOut) +void gras_msg_handle(volatile double timeOut) { double untiltimer; unsigned int cpt; - int volatile ran_ok; + volatile int ran_ok; s_gras_msg_t msg; gras_msg_procdata_t pd = - (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id); + (gras_msg_procdata_t) gras_libdata_by_id(gras_msg_libdata_id); gras_cblist_t *list = NULL; gras_msg_cb_t cb; s_gras_msg_cb_ctx_t ctx; - int timerexpected, timeouted; + volatile int timerexpected, timeouted; xbt_ex_t e; - VERB1("Handling message within the next %.2fs", timeOut); + XBT_VERB("Handling message within the next %.2fs", timeOut); untiltimer = gras_msg_timer_handle(); - DEBUG1("Next timer in %f sec", untiltimer); + XBT_DEBUG("Next timer in %f sec", untiltimer); if (untiltimer == 0.0) { /* A timer was already elapsed and handled */ return; @@ -324,8 +327,8 @@ void gras_msg_handle(double timeOut) /* get a message (from the queue or from the net) */ timeouted = 0; - if (xbt_dynar_length(pd->msg_queue)) { - DEBUG0("Get a message from the queue"); + if (!xbt_dynar_is_empty(pd->msg_queue)) { + XBT_DEBUG("Get a message from the queue"); xbt_dynar_shift(pd->msg_queue, &msg); } else { TRY { @@ -335,7 +338,7 @@ void gras_msg_handle(double timeOut) CATCH(e) { if (e.category != timeout_error) RETHROW; - DEBUG0("Damn. Timeout while getting a message from the queue"); + XBT_DEBUG("Damn. Timeout while getting a message from the queue"); xbt_ex_free(e); timeouted = 1; } @@ -350,19 +353,19 @@ void gras_msg_handle(double timeOut) /* we served a timer, we're done */ return; } else { - xbt_assert1(untiltimer > 0, "Negative timer (%f). I'm 'puzzeled'", + xbt_assert(untiltimer > 0, "Negative timer (%f). I'm 'puzzeled'", untiltimer); - WARN1 - ("No timer elapsed, in contrary to expectations (next in %f sec)", - untiltimer); - THROW1(timeout_error, 0, + XBT_WARN + ("No timer elapsed, in contrary to expectations (next in %f sec)", + untiltimer); + THROWF(timeout_error, 0, "No timer elapsed, in contrary to expectations (next in %f sec)", untiltimer); } } else { /* select timeouted, and no timer elapsed. Nothing to do */ - THROW1(timeout_error, 0, "No new message or timer (delay was %f)", + THROWF(timeout_error, 0, "No new message or timer (delay was %f)", timeOut); } @@ -377,10 +380,10 @@ void gras_msg_handle(double timeOut) } } if (!list) { - INFO4 - ("No callback for message '%s' (type:%s) from %s:%d. Queue it for later gras_msg_wait() use.", - msg.type->name, e_gras_msg_kind_names[msg.kind], - gras_socket_peer_name(msg.expe), gras_socket_peer_port(msg.expe)); + XBT_INFO + ("No callback for message '%s' (type:%s) from %s:%d. Queue it for later gras_msg_wait() use.", + msg.type->name, e_gras_msg_kind_names[msg.kind], + xbt_socket_peer_name(msg.expe), xbt_socket_peer_port(msg.expe)); xbt_dynar_push(pd->msg_waitqueue, &msg); return; /* FIXME: maybe we should call ourselves again until the end of the timer or a proper msg is got */ } @@ -396,16 +399,18 @@ void gras_msg_handle(double timeOut) ran_ok = 0; TRY { xbt_dynar_foreach(list->cbs, cpt, cb) { + volatile unsigned int cpt2 = cpt; if (!ran_ok) { - DEBUG4 - ("Use the callback #%d (@%p) for incomming msg '%s' (payload_size=%d)", - cpt + 1, cb, msg.type->name, msg.payl_size); - if (!(*cb) (&ctx, msg.payl)) { + XBT_DEBUG + ("Use the callback #%u (@%p) for incoming msg '%s' (payload_size=%d)", + cpt + 1, cb, msg.type->name, msg.payl_size); + if (!cb(&ctx, msg.payl)) { /* cb handled the message */ free(msg.payl); ran_ok = 1; } } + cpt = cpt2; } } CATCH(e) { @@ -422,12 +427,11 @@ void gras_msg_handle(double timeOut) e.host = (char *) gras_os_myname(); xbt_ex_setup_backtrace(&e); } - INFO5("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d", - (e.remote ? "remote" : "local"), - e.msg, - msg.type->name, - gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); + XBT_VERB + ("Propagate %s exception ('%s') from '%s' RPC cb back to %s:%d", + (e.remote ? "remote" : "local"), e.msg, msg.type->name, + xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); if (XBT_LOG_ISENABLED(gras_msg, xbt_log_priority_verbose)) xbt_ex_display(&e); gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror, @@ -437,45 +441,47 @@ void gras_msg_handle(double timeOut) ctx.answer_due = 0; ran_ok = 1; } else { - RETHROW4 - ("Callback #%d (@%p) to message '%s' (payload size: %d) raised an exception: %s", - cpt + 1, cb, msg.type->name, msg.payl_size); + RETHROWF + ("Callback #%u (@%p) to message '%s' (payload size: %d) raised an exception: %s", + cpt + 1, cb, msg.type->name, msg.payl_size); } } - xbt_assert1(!ctx.answer_due, + xbt_assert(!ctx.answer_due, "Bug in user code: RPC callback to message '%s' didn't call gras_msg_rpcreturn", msg.type->name); if (ctx.answer_due) - CRITICAL1 - ("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) " - "AND IN SIMGRID (process wasn't killed by an assert)", - msg.type->name); + XBT_CRITICAL + ("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) " + "AND IN SIMGRID (process wasn't killed by an assert)", + msg.type->name); if (!ran_ok) - THROW1(mismatch_error, 0, + THROWF(mismatch_error, 0, "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)", msg.type->name); - /* FIXME: gras_datadesc_free not implemented => leaking the payload */ + /* FIXME: xbt_datadesc_free not implemented => leaking the payload */ break; case e_gras_msg_kind_rpcanswer: - INFO3("Unexpected RPC answer discarded (type: %s; from:%s:%d)", - msg.type->name, gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); - WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload"); + XBT_INFO("Unexpected RPC answer discarded (type: %s; from:%s:%d)", + msg.type->name, xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); + XBT_WARN + ("FIXME: xbt_datadesc_free not implemented => leaking the payload"); return; case e_gras_msg_kind_rpcerror: - INFO3("Unexpected RPC error discarded (type: %s; from:%s:%d)", - msg.type->name, gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); - WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload"); + XBT_INFO("Unexpected RPC error discarded (type: %s; from:%s:%d)", + msg.type->name, xbt_socket_peer_name(msg.expe), + xbt_socket_peer_port(msg.expe)); + XBT_WARN + ("FIXME: xbt_datadesc_free not implemented => leaking the payload"); return; default: - THROW1(unknown_error, 0, - "Cannot handle messages of kind %d yet", msg.type->kind); + THROWF(unknown_error, 0, + "Cannot handle messages of kind %d yet", (int)msg.type->kind); } }