X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/8499055fbaebc9fd0d309ae43a6072c8f5b4a68e..b893d47aef7fcd5e09374f7620f262ad3d9857a7:/src/gras/Msg/msg.c diff --git a/src/gras/Msg/msg.c b/src/gras/Msg/msg.c index 9276115872..8ed2aefb1f 100644 --- a/src/gras/Msg/msg.c +++ b/src/gras/Msg/msg.c @@ -8,6 +8,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "xbt/ex.h" +#include "xbt/ex_interface.h" #include "gras/Msg/msg_private.h" #include "gras/Virtu/virtu_interface.h" #include "gras/DataDesc/datadesc_interface.h" @@ -246,11 +247,9 @@ gras_msgtype_t gras_msgtype_by_id(int id) { * @param timeout: How long should we wait for this message. * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message) * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant) - * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant) + * @param filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant) * @param filter_ctx: context passed as second argument of the filter (a pattern to match?) - * @param[out] msgt_got: where to write the descriptor of the message we got - * @param[out] expe_got: where to create a socket to answer the incomming message - * @param[out] payl_got: where to write the payload of the incomming message + * @param[out] msg_got: where to write the message we got * * Every message of another type received before the one waited will be queued * and used by subsequent call to this function or gras_msg_handle(). @@ -294,9 +293,9 @@ gras_msg_wait_ext(double timeout, while (1) { memset(&msg,sizeof(msg),0); - msg.expe = gras_trp_select(timeout - now + start); + msg.expe = gras_trp_select(timeout ? timeout - now + start : 0); gras_msg_recv(msg.expe, &msg); - DEBUG0("Here"); + DEBUG0("Got a message from the socket"); if ( ( !msgt_want || (msg.type->code == msgt_want->code)) && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe), @@ -304,14 +303,16 @@ gras_msg_wait_ext(double timeout, && (!filter || filter(&msg,filter_ctx))) { memcpy(msg_got,&msg,sizeof(s_gras_msg_t)); + DEBUG0("Message matches expectations. Use it."); return; } + DEBUG0("Message does not match expectations. Queue it."); /* not expected msg type. Queue it for later */ xbt_dynar_push(pd->msg_queue,&msg); now=gras_os_time(); - if (now - start + 0.001 < timeout) { + if (now - start + 0.001 > timeout) { THROW1(timeout_error, now-start+0.001-timeout, "Timeout while waiting for msg %s",msgt_want->name); } @@ -337,12 +338,17 @@ gras_msg_wait(double timeout, void *payload) { s_gras_msg_t msg; - return gras_msg_wait_ext(timeout, - msgt_want, NULL, NULL, NULL, - &msg); - memcpy(payload,msg.payl,msg.payl_size); - free(msg.payl); - *expeditor = msg.expe; + gras_msg_wait_ext(timeout, + msgt_want, NULL, NULL, NULL, + &msg); + + if (payload) { + memcpy(payload,msg.payl,msg.payl_size); + free(msg.payl); + } + + if (expeditor) + *expeditor = msg.expe; } @@ -356,19 +362,46 @@ gras_msg_send(gras_socket_t sock, gras_msg_send_ext(sock, e_gras_msg_kind_oneway,0, msgtype, payload); } +/** @brief Handle all messages arriving within the given period + * + * @param timeOut: How long to wait for incoming messages (in seconds) + * @return the error code (or no_error). + * + * Messages are dealed with just like gras_msg_handle() would do. The + * difference is that gras_msg_handle() handles at most one message (or wait up + * to timeout second when no message arrives) while this function handles any + * amount of messages, and lasts the given period in any case. + */ +void +gras_msg_handleall(double period) { + xbt_ex_t e; + double begin=gras_os_time(); + double now; + + do { + now=gras_os_time(); + TRY{ + gras_msg_handle(period - now + begin); + } CATCH(e) { + if (e.category != timeout_error) + RETHROW0("Error while waiting for messages: %s"); + xbt_ex_free(e); + } + } while (now - begin < period); +} /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds) * * @param timeOut: How long to wait for incoming messages (in seconds) * @return the error code (or no_error). * - * Messages are passed to the callbacks. + * Messages are passed to the callbacks. See also gras_msg_handleall(). */ void gras_msg_handle(double timeOut) { double untiltimer; - int cpt; + int cpt, ran_ok; s_gras_msg_t msg; @@ -383,7 +416,7 @@ gras_msg_handle(double timeOut) { VERB1("Handling message within the next %.2fs",timeOut); untiltimer = gras_msg_timer_handle(); - DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer); + DEBUG1("Next timer in %f sec", untiltimer); if (untiltimer == 0.0) { /* A timer was already elapsed and handled */ return; @@ -414,11 +447,14 @@ gras_msg_handle(double timeOut) { TRY { /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */ gras_msg_recv(msg.expe, &msg); - DEBUG0("Here"); + DEBUG1("Received a msg from the socket kind:%s", + e_gras_msg_kind_names[msg.kind]); } CATCH(e) { - RETHROW1("Error caught while receiving a message on select()ed socket %p: %s", - msg.expe); + RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s", + msg.expe, + gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe), + gras_socket_peer_port(msg.expe)); } } } @@ -442,7 +478,8 @@ gras_msg_handle(double timeOut) { } else { /* select timeouted, and no timer elapsed. Nothing to do */ - THROW0(timeout_error, 0, "No new message or timer"); + THROW1(timeout_error, 0, "No new message or timer (delay was %f)", + timeOut); } } @@ -466,38 +503,47 @@ gras_msg_handle(double timeOut) { ctx.ID = msg.ID; ctx.msgtype = msg.type; - switch (msg.type->kind) { + switch (msg.kind) { case e_gras_msg_kind_oneway: case e_gras_msg_kind_rpccall: + ran_ok=0; TRY { xbt_dynar_foreach(list->cbs,cpt,cb) { - VERB3("Use the callback #%d (@%p) for incomming msg %s", - cpt+1,cb,msg.type->name); - if ((*cb)(&ctx,msg.payl)) { - /* cb handled the message */ - free(msg.payl); - return; + if (!ran_ok) { + VERB3("Use the callback #%d (@%p) for incomming msg %s", + cpt+1,cb,msg.type->name); + if ((*cb)(&ctx,msg.payl)) { + /* cb handled the message */ + free(msg.payl); + ran_ok = 1; + } } } } CATCH(e) { + free(msg.payl); if (msg.type->kind == e_gras_msg_kind_rpccall) { /* The callback raised an exception, propagate it on the network */ - e.host = (char*)gras_os_myname(); -#ifdef HAVE_EXECINFO_H - e.bt_strings = backtrace_symbols (e.bt, e.used); -#endif - gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror , msg.ID, msg.type, &e); - e.host = NULL; - INFO2("RPC callback raised an exception, which were propagated back to %s:%d", - gras_socket_peer_name(msg.expe), gras_socket_peer_port(msg.expe)); + if (!e.remote) { /* the exception is born on this machine */ + e.host = (char*)gras_os_myname(); + xbt_ex_setup_backtrace(&e); + } + VERB4("Propagate %s exception from '%s' RPC cb back to %s:%d", + (e.remote ? "remote" : "local"), + msg.type->name, + gras_socket_peer_name(msg.expe), + gras_socket_peer_port(msg.expe)); + gras_msg_send_ext(msg.expe, e_gras_msg_kind_rpcerror, + msg.ID, msg.type, &e); xbt_ex_free(e); - return; + ran_ok=1; + } else { + RETHROW0("Callback raised an exception: %s"); } - RETHROW; } + if (!ran_ok) + THROW1(mismatch_error,0, + "Message '%s' refused by all registered callbacks", msg.type->name); /* FIXME: gras_datadesc_free not implemented => leaking the payload */ - THROW1(mismatch_error,0, - "Message '%s' refused by all registered callbacks", msg.type->name); break; @@ -513,7 +559,7 @@ gras_msg_handle(double timeOut) { default: THROW1(unknown_error,0, - "Cannot handle messages of kind %d yet",msg.type->kind); + "Cannot handle messages of kind %d yet",msg.type->kind); } }