#include "xbt/ex_interface.h"
#include "gras/Msg/msg_private.h"
#include "gras/Virtu/virtu_interface.h"
-#include "gras/Transport/transport_interface.h" /* gras_select */
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(gras_msg);
double start, now;
gras_msg_procdata_t pd=
(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
- int cpt;
+ unsigned int cpt;
xbt_assert0(msg_got,"msg_got is an output parameter");
start = gras_os_time();
- VERB1("Waiting for message '%s'",msgt_want?msgt_want->name:"(any)");
+ VERB2("Waiting for message '%s' for %fs",msgt_want?msgt_want->name:"(any)", timeout);
xbt_dynar_foreach(pd->msg_waitqueue,cpt,msg){
if ( ( !msgt_want || (msg.type->code == msgt_want->code))
memset(&msg,sizeof(msg),0);
TRY {
- xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
- /*
- msg.expe = gras_trp_select(timeout ? timeout - now + start : 0);
- gras_msg_recv(msg.expe, &msg);
- */
+ xbt_queue_shift_timed(pd->msg_received,&msg,timeout ? timeout - now + start : 0);
} CATCH(e) {
if (e.category == system_error &&
!strncmp("Socket closed by remote side",e.msg,
RETHROW0("Error while waiting for messages: %s");
xbt_ex_free(e);
}
- } while (now - begin < period);
+ /* Epsilon to avoid numerical stability issues were the waited interval is so small that the global clock cannot notice the increment */
+ } while (period - now + begin > 0);
}
/** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
double untiltimer;
- int cpt;
+ unsigned int cpt;
int volatile ran_ok;
s_gras_msg_t msg;
xbt_dynar_shift(pd->msg_queue,&msg);
} else {
TRY {
- xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
+ xbt_queue_shift_timed(pd->msg_received,&msg,timeOut);
// msg.expe = gras_trp_select(timeOut);
} CATCH(e) {
if (e.category != timeout_error)
RETHROW;
+ DEBUG0("Damn. Timeout while getting a message from the queue");
xbt_ex_free(e);
timeouted = 1;
}
"AND IN SIMGRID (process wasn't killed by an assert)",msg.type->name);
if (!ran_ok)
THROW1(mismatch_error,0,
- "Message '%s' refused by all registered callbacks", msg.type->name);
+ "Message '%s' refused by all registered callbacks (maybe your callback misses a 'return 0' at the end)", msg.type->name);
/* FIXME: gras_datadesc_free not implemented => leaking the payload */
break;
case e_gras_msg_kind_rpcanswer:
- INFO1("Unexpected RPC answer discarded (type: %s)", msg.type->name);
+ INFO3("Unexpected RPC answer discarded (type: %s; from:%s:%d)", msg.type->name,
+ gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
return;
case e_gras_msg_kind_rpcerror:
- INFO1("Unexpected RPC error discarded (type: %s)", msg.type->name);
+ INFO3("Unexpected RPC error discarded (type: %s; from:%s:%d)", msg.type->name,
+ gras_socket_peer_name(msg.expe),gras_socket_peer_port(msg.expe));
WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
return;