X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6d17036094433f0304c29b6f57922cee32b0c67c..be7098ec6b72994a7d4effce9566da762ea04ef7:/src/gras/Msg/gras_msg_exchange.c diff --git a/src/gras/Msg/gras_msg_exchange.c b/src/gras/Msg/gras_msg_exchange.c index 65b30f9eee..e15f8f8315 100644 --- a/src/gras/Msg/gras_msg_exchange.c +++ b/src/gras/Msg/gras_msg_exchange.c @@ -12,7 +12,6 @@ #include "xbt/ex_interface.h" #include "gras/Msg/msg_private.h" #include "gras/Virtu/virtu_interface.h" -#include "gras/Transport/transport_interface.h" /* gras_select */ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg); @@ -49,12 +48,12 @@ gras_msg_wait_ext_(double timeout, double start, now; gras_msg_procdata_t pd= (gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id); - int cpt; + unsigned int cpt; xbt_assert0(msg_got,"msg_got is an output parameter"); start = gras_os_time(); - VERB1("Waiting for message '%s'",msgt_want?msgt_want->name:"(any)"); + VERB2("Waiting for message '%s' for %fs",msgt_want?msgt_want->name:"(any)", timeout); xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){ if ( ( !msgt_want || (msg.type->code == msgt_want->code)) @@ -92,8 +91,7 @@ gras_msg_wait_ext_(double timeout, memset(&msg,sizeof(msg),0); TRY { - msg.expe = gras_trp_select(timeout ? timeout - now + start : 0); - gras_msg_recv(msg.expe, &msg); + xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0); } CATCH(e) { if (e.category == system_error && !strncmp("Socket closed by remote side",e.msg, @@ -280,7 +278,8 @@ gras_msg_handleall(double period) { RETHROW0("Error while waiting for messages: %s"); xbt_ex_free(e); } - } while (now - begin < period); + /* Epsilon to avoid numerical stability issues were the waited interval is so small that the global clock cannot notice the increment */ + } while (period - now + begin > 0); } /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds) @@ -288,14 +287,16 @@ gras_msg_handleall(double period) { * @param timeOut: How long to wait for incoming messages (in seconds) * @return the error code (or no_error). * - * Messages are passed to the callbacks. See also gras_msg_handleall(). + * Any message arriving in the given interval is passed to the callbacks. + * + * @sa gras_msg_handleall(). */ void gras_msg_handle(double timeOut) { double untiltimer; - int cpt; + unsigned int cpt; int volatile ran_ok; s_gras_msg_t msg; @@ -330,28 +331,15 @@ gras_msg_handle(double timeOut) { xbt_dynar_shift(pd->msg_queue,&msg); } else { TRY { - msg.expe = gras_trp_select(timeOut); + xbt_queue_shift_timed(pd->msg_received,&msg,timeOut); +// msg.expe = gras_trp_select(timeOut); } CATCH(e) { if (e.category != timeout_error) RETHROW; + DEBUG0("Damn. Timeout while getting a message from the queue"); xbt_ex_free(e); timeouted = 1; } - - if (!timeouted) { - TRY { - /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */ - gras_msg_recv(msg.expe, &msg); - DEBUG1("Received a msg from the socket kind:%s", - e_gras_msg_kind_names[msg.kind]); - - } CATCH(e) { - RETHROW4("Error while receiving a message on select()ed socket %p to [%s]%s:%d: %s", - msg.expe, - gras_socket_peer_proc(msg.expe),gras_socket_peer_name(msg.expe), - gras_socket_peer_port(msg.expe)); - } - } } if (timeouted) { @@ -446,23 +434,28 @@ gras_msg_handle(double timeOut) { RETHROW0("Callback raised an exception: %s"); } } - xbt_assert0(!(ctx.answer_due), - "RPC callback didn't call gras_msg_rpcreturn"); + xbt_assert1(! ctx.answer_due, + "Bug in user code: RPC callback to message '%s' didn't call gras_msg_rpcreturn",msg.type->name); + if (ctx.answer_due) + CRITICAL1("BUGS BOTH IN USER CODE (RPC callback to message '%s' didn't call gras_msg_rpcreturn) " + "AND IN SIMGRID (process wasn't killed by an assert)",msg.type->name); if (!ran_ok) THROW1(mismatch_error,0, - "Message '%s' refused by all registered callbacks", msg.type->name); + "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)", msg.type->name); /* FIXME: gras_datadesc_free not implemented => leaking the payload */ break; case e_gras_msg_kind_rpcanswer: - INFO1("Unexpected RPC answer discarded (type: %s)", msg.type->name); + INFO3("Unexpected RPC answer discarded (type: %s; from:%s:%d)", msg.type->name, + gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe)); WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload"); return; case e_gras_msg_kind_rpcerror: - INFO1("Unexpected RPC error discarded (type: %s)", msg.type->name); + INFO3("Unexpected RPC error discarded (type: %s; from:%s:%d)", msg.type->name, + gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe)); WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload"); return;