Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add timers to GRAS
[simgrid.git] / src / gras / Msg / msg.c
index 50c529f..cb358b3 100644 (file)
@@ -13,6 +13,8 @@
 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Virtu/virtu_interface.h"
 
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
 
 xbt_set_t _gras_msgtype_set = NULL;
@@ -25,8 +27,9 @@ static char *make_namev(const char *name, short int ver);
 static void *gras_msg_procdata_new() {
    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
    
-   res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t),     NULL);
+   res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
+   res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    
    return (void*)res;
 }
@@ -39,6 +42,7 @@ static void gras_msg_procdata_free(void *data) {
    
    xbt_dynar_free(&( res->msg_queue ));
    xbt_dynar_free(&( res->cbl_list ));
+   xbt_dynar_free(&( res->timers ));
 }
 
 /*
@@ -295,7 +299,7 @@ gras_msg_wait(double           timeout,
   double start, now;
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
   int cpt;
-  gras_msg_t msg;
+  s_gras_msg_t msg;
   
   *expeditor = NULL;
   payload_got = NULL;
@@ -355,10 +359,12 @@ gras_msg_wait(double           timeout,
 xbt_error_t 
 gras_msg_handle(double timeOut) {
   
+  double          untiltimer;
+   
   xbt_error_t    errcode;
   int             cpt;
 
-  gras_msg_t      msg;
+  s_gras_msg_t    msg;
   gras_socket_t   expeditor;
   void           *payload=NULL;
   int             payload_size;
@@ -370,19 +376,41 @@ gras_msg_handle(double timeOut) {
 
   VERB1("Handling message within the next %.2fs",timeOut);
   
+  untiltimer = gras_msg_timer_handle();
+  DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
+  if (untiltimer == 0.0) {
+     /* A timer was already elapsed */
+     return no_error;
+  }
+   
   /* get a message (from the queue or from the net) */
   if (xbt_dynar_length(pd->msg_queue)) {
     xbt_dynar_shift(pd->msg_queue,&msg);
     expeditor = msg.expeditor;
     msgtype   = msg.type;
     payload   = msg.payload;
-    
+    errcode   = no_error;
   } else {
-    TRY(gras_trp_select(timeOut, &expeditor));
-    TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+    errcode = gras_trp_select(MIN(timeOut,untiltimer), &expeditor);
+    if (errcode != no_error && errcode != timeout_error)
+       return errcode;
+    if (errcode != timeout_error)
+       TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+  }
+
+  if (errcode == timeout_error && untiltimer < timeOut) {
+     /* A timer elapsed before the arrival of any message even if we select()ed a bit */
+     untiltimer = gras_msg_timer_handle();
+     if (untiltimer == 0.0) {
+       return no_error;
+     } else {
+       WARN1("Weird. I computed that a timer should elapse shortly, but none did (I still should wait %f sec)",
+             untiltimer);
+       return timeout_error;
+     }
   }
-      
-  /* handle it */
+   
+  /* A message was already there or arrived in the meanwhile. handle it */
   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
     if (list->id == msgtype->code) {
       break;