#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
#include "gras/Virtu/virtu_interface.h"
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
xbt_set_t _gras_msgtype_set = NULL;
static void *gras_msg_procdata_new() {
gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
- res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t), NULL);
+ res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t), NULL);
res->cbl_list = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
+ res->timers = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
return (void*)res;
}
xbt_dynar_free(&( res->msg_queue ));
xbt_dynar_free(&( res->cbl_list ));
+ xbt_dynar_free(&( res->timers ));
}
/*
double start, now;
gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
int cpt;
- gras_msg_t msg;
+ s_gras_msg_t msg;
*expeditor = NULL;
payload_got = NULL;
xbt_error_t
gras_msg_handle(double timeOut) {
+ double untiltimer;
+
xbt_error_t errcode;
int cpt;
- gras_msg_t msg;
+ s_gras_msg_t msg;
gras_socket_t expeditor;
void *payload=NULL;
int payload_size;
VERB1("Handling message within the next %.2fs",timeOut);
+ untiltimer = gras_msg_timer_handle();
+ DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
+ if (untiltimer == 0.0) {
+ /* A timer was already elapsed */
+ return no_error;
+ }
+
/* get a message (from the queue or from the net) */
if (xbt_dynar_length(pd->msg_queue)) {
xbt_dynar_shift(pd->msg_queue,&msg);
expeditor = msg.expeditor;
msgtype = msg.type;
payload = msg.payload;
-
+ errcode = no_error;
} else {
- TRY(gras_trp_select(timeOut, &expeditor));
- TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+ errcode = gras_trp_select(MIN(timeOut,untiltimer), &expeditor);
+ if (errcode != no_error && errcode != timeout_error)
+ return errcode;
+ if (errcode != timeout_error)
+ TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+ }
+
+ if (errcode == timeout_error && untiltimer < timeOut) {
+ /* A timer elapsed before the arrival of any message even if we select()ed a bit */
+ untiltimer = gras_msg_timer_handle();
+ if (untiltimer == 0.0) {
+ return no_error;
+ } else {
+ WARN1("Weird. I computed that a timer should elapse shortly, but none did (I still should wait %f sec)",
+ untiltimer);
+ return timeout_error;
+ }
}
-
- /* handle it */
+
+ /* A message was already there or arrived in the meanwhile. handle it */
xbt_dynar_foreach(pd->cbl_list,cpt,list) {
if (list->id == msgtype->code) {
break;