Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Allow users to not care about who sended the message they were waiting for (and thus...
[simgrid.git] / src / gras / Msg / msg.c
index 50c529f..50f3c77 100644 (file)
@@ -13,6 +13,8 @@
 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Virtu/virtu_interface.h"
 
+#define MIN(a,b) ((a) < (b) ? (a) : (b))
+
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
 
 xbt_set_t _gras_msgtype_set = NULL;
@@ -25,8 +27,9 @@ static char *make_namev(const char *name, short int ver);
 static void *gras_msg_procdata_new() {
    gras_msg_procdata_t res = xbt_new(s_gras_msg_procdata_t,1);
    
-   res->msg_queue = xbt_dynar_new(sizeof(gras_msg_t),     NULL);
+   res->msg_queue = xbt_dynar_new(sizeof(s_gras_msg_t),   NULL);
    res->cbl_list  = xbt_dynar_new(sizeof(gras_cblist_t *),gras_cbl_free);
+   res->timers    = xbt_dynar_new(sizeof(s_gras_timer_t), NULL);
    
    return (void*)res;
 }
@@ -39,6 +42,9 @@ static void gras_msg_procdata_free(void *data) {
    
    xbt_dynar_free(&( res->msg_queue ));
    xbt_dynar_free(&( res->cbl_list ));
+   xbt_dynar_free(&( res->timers ));
+   
+   free(res);
 }
 
 /*
@@ -80,8 +86,8 @@ gras_msg_exit(void) {
 void gras_msgtype_free(void *t) {
   gras_msgtype_t msgtype=(gras_msgtype_t)t;
   if (msgtype) {
-    xbt_free(msgtype->name);
-    xbt_free(msgtype);
+    free(msgtype->name);
+    free(msgtype);
   }
 }
 
@@ -121,7 +127,7 @@ void gras_msgtype_declare(const char           *name,
  * @param version: something like versionning symbol
  * @param payload: datadescription of the payload
  *
- * Registers a message to the GRAS mecanism. Use this version instead of 
+ * Registers a message to the GRAS mechanism. Use this version instead of 
  * gras_msgtype_declare when you change the semantic or syntax of a message and
  * want your programs to be able to deal with both versions. Internally, each
  * will be handled as an independent message type, so you can register 
@@ -135,7 +141,7 @@ gras_msgtype_declare_v(const char           *name,
   xbt_error_t   errcode;
   gras_msgtype_t msgtype;
   char *namev=make_namev(name,version);
-  
+      
   errcode = xbt_set_get_by_name(_gras_msgtype_set,
                                 namev,(xbt_set_elm_t*)&msgtype);
 
@@ -184,7 +190,7 @@ gras_msgtype_t gras_msgtype_by_namev(const char      *name,
   if (!res) 
      WARN1("msgtype_by_name(%s) returns NULL",namev);
   if (name != namev) 
-    xbt_free(namev);
+    free(namev);
   
   return res;
 }
@@ -242,7 +248,7 @@ gras_msg_recv(gras_socket_t    sock,
   TRY(gras_trp_chunk_recv(sock, header, 6));
   for (cpt=0; cpt<4; cpt++)
     if (header[cpt] != GRAS_header[cpt])
-      RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
+      RAISE2(mismatch_error,"Incoming bytes do not look like a GRAS message (header='%.4s' not '%.4s')",header,GRAS_header);
   if (header[4] != GRAS_header[4]) 
     RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
           (int)header[4], (int)GRAS_header[4]);
@@ -258,7 +264,7 @@ gras_msg_recv(gras_socket_t    sock,
           "Got error %s while retrieving the type associated to messages '%s'",
           xbt_error_name(errcode),msg_name);
   /* FIXME: Survive unknown messages */
-  xbt_free(msg_name);
+  free(msg_name);
 
   *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
   xbt_assert2(*payload_size > 0,
@@ -295,24 +301,25 @@ gras_msg_wait(double           timeout,
   double start, now;
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
   int cpt;
-  gras_msg_t msg;
+  s_gras_msg_t msg;
+  gras_socket_t expeditor_res = NULL;
   
-  *expeditor = NULL;
   payload_got = NULL;
 
   if (!msgt_want)
     RAISE0(mismatch_error,
           "Cannot wait for the NULL message (did msgtype_by_name fail?)");
 
-  VERB1("Waiting for message %s",msgt_want->name);
+  VERB1("Waiting for message '%s'",msgt_want->name);
 
   start = now = gras_os_time();
 
   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
     if (msg.type->code == msgt_want->code) {
-      *expeditor = msg.expeditor;
+      if (expeditor)
+        *expeditor = msg.expeditor;
       memcpy(payload, msg.payload, msg.payload_size);
-      xbt_free(msg.payload);
+      free(msg.payload);
       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
       VERB0("The waited message was queued");
       return no_error;
@@ -320,19 +327,21 @@ gras_msg_wait(double           timeout,
   }
 
   while (1) {
-    TRY(gras_trp_select(timeout - now + start, expeditor));
-    TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
+    TRY(gras_trp_select(timeout - now + start, &expeditor_res));
+    TRY(gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got));
     if (msgt_got->code == msgt_want->code) {
+      if (expeditor)
+       *expeditor=expeditor_res;
       memcpy(payload, payload_got, payload_size_got);
-      xbt_free(payload_got);
+      free(payload_got);
       VERB0("Got waited message");
       return no_error;
     }
 
     /* not expected msg type. Queue it for later */
-    msg.expeditor = *expeditor;
-    msg.type      =  msgt_got;
-    msg.payload   =  payload;
+    msg.expeditor = expeditor_res;
+    msg.type      = msgt_got;
+    msg.payload   = payload;
     msg.payload_size = payload_size_got;
     xbt_dynar_push(pd->msg_queue,&msg);
     
@@ -347,7 +356,7 @@ gras_msg_wait(double           timeout,
 
 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
  *
- * @param timeOut: How long to wait for incoming messages
+ * @param timeOut: How long to wait for incoming messages (in seconds)
  * @return the error code (or no_error).
  *
  * Messages are passed to the callbacks.
@@ -355,34 +364,76 @@ gras_msg_wait(double           timeout,
 xbt_error_t 
 gras_msg_handle(double timeOut) {
   
+  double          untiltimer;
+   
   xbt_error_t    errcode;
   int             cpt;
 
-  gras_msg_t      msg;
+  s_gras_msg_t    msg;
   gras_socket_t   expeditor;
   void           *payload=NULL;
   int             payload_size;
   gras_msgtype_t  msgtype;
 
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
-  gras_cblist_t  *list;
+  gras_cblist_t  *list=NULL;
   gras_msg_cb_t       cb;
+   
+  int timerexpected;
 
   VERB1("Handling message within the next %.2fs",timeOut);
   
+  untiltimer = gras_msg_timer_handle();
+  DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
+  if (untiltimer == 0.0) {
+     /* A timer was already elapsed and handled */
+     return no_error;
+  }
+  if (untiltimer != -1.0) {
+     timerexpected = 1;
+     timeOut = MIN(timeOut, untiltimer);
+  } else {
+     timerexpected = 0;
+  }
+   
   /* get a message (from the queue or from the net) */
   if (xbt_dynar_length(pd->msg_queue)) {
+    DEBUG0("Get a message from the queue");
     xbt_dynar_shift(pd->msg_queue,&msg);
     expeditor = msg.expeditor;
     msgtype   = msg.type;
     payload   = msg.payload;
-    
+    errcode   = no_error;
   } else {
-    TRY(gras_trp_select(timeOut, &expeditor));
-    TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+    errcode = gras_trp_select(timeOut, &expeditor);
+    if (errcode != no_error && errcode != timeout_error)
+       return errcode;
+    if (errcode != timeout_error)
+       TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
   }
-      
-  /* handle it */
+
+  if (errcode == timeout_error ) {
+     if (timerexpected) {
+         
+       /* A timer elapsed before the arrival of any message even if we select()ed a bit */
+       untiltimer = gras_msg_timer_handle();
+       if (untiltimer == 0.0) {
+          return no_error;
+       } else {
+          xbt_assert1(untiltimer>0, "Negative timer (%f). I'm puzzeled", untiltimer);
+          ERROR1("No timer elapsed, in contrary to expectations (next in %f sec)",
+                 untiltimer);
+          return timeout_error;
+       }
+       
+     } else {
+       /* select timeouted, and no timer elapsed. Nothing to do */
+       return timeout_error;
+     }
+     
+  }
+   
+  /* A message was already there or arrived in the meanwhile. handle it */
   xbt_dynar_foreach(pd->cbl_list,cpt,list) {
     if (list->id == msgtype->code) {
       break;
@@ -402,7 +453,7 @@ gras_msg_handle(double timeOut) {
           cpt+1,cb,msgtype->name);
     if ((*cb)(expeditor,payload)) {
       /* cb handled the message */
-      xbt_free(payload);
+      free(payload);
       return no_error;
     }
   }
@@ -417,7 +468,7 @@ gras_cbl_free(void *data){
   gras_cblist_t *list=*(void**)data;
   if (list) {
     xbt_dynar_free(&( list->cbs ));
-    xbt_free(list);
+    free(list);
   }
 }
 
@@ -434,7 +485,7 @@ gras_cb_register(gras_msgtype_t msgtype,
   gras_cblist_t *list=NULL;
   int cpt;
 
-  DEBUG2("Register %p as callback to %s",cb,msgtype->name);
+  DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
 
   /* search the list of cb for this message on this host (creating if NULL) */
   xbt_dynar_foreach(pd->cbl_list,cpt,list) {