Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
This data is used within the TRY/CATCH block; mark it volatile to avoid issues on...
[simgrid.git] / src / gras / Msg / msg.c
index 33cee8c..d6cc3a4 100644 (file)
@@ -7,19 +7,19 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-
+#include "xbt/ex.h"
 #include "gras/Msg/msg_private.h"
-#include "gras/DataDesc/datadesc_interface.h"
-#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Virtu/virtu_interface.h"
+#include "gras/DataDesc/datadesc_interface.h"
+#include "gras/Transport/transport_interface.h" /* gras_select */
 
 #define MIN(a,b) ((a) < (b) ? (a) : (b))
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg,gras,"High level messaging");
 
 xbt_set_t _gras_msgtype_set = NULL;
-static char GRAS_header[6];
 static char *make_namev(const char *name, short int ver);
+char _GRAS_header[6];
 
 /*
  * Creating procdata for this module
@@ -43,6 +43,8 @@ static void gras_msg_procdata_free(void *data) {
    xbt_dynar_free(&( res->msg_queue ));
    xbt_dynar_free(&( res->cbl_list ));
    xbt_dynar_free(&( res->timers ));
+   
+   free(res);
 }
 
 /*
@@ -64,9 +66,9 @@ void gras_msg_init(void) {
   
   _gras_msgtype_set = xbt_set_new();
 
-  memcpy(GRAS_header,"GRAS", 4);
-  GRAS_header[4]=GRAS_PROTOCOL_VERSION;
-  GRAS_header[5]=(char)GRAS_THISARCH;
+  memcpy(_GRAS_header,"GRAS", 4);
+  _GRAS_header[4]=GRAS_PROTOCOL_VERSION;
+  _GRAS_header[5]=(char)GRAS_THISARCH;
 }
 
 /*
@@ -136,14 +138,21 @@ gras_msgtype_declare_v(const char           *name,
                       short int             version,
                       gras_datadesc_type_t  payload) {
  
-  xbt_error_t   errcode;
-  gras_msgtype_t msgtype;
+  gras_msgtype_t msgtype=NULL;
   char *namev=make_namev(name,version);
+  volatile int found = 0;
+  xbt_ex_t e;    
   
-  errcode = xbt_set_get_by_name(_gras_msgtype_set,
-                                namev,(xbt_set_elm_t*)&msgtype);
+  TRY {
+    msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,namev);
+    found = 1;
+  } CATCH(e) {
+    if (e.category != not_found_error)
+      RETHROW;
+    xbt_ex_free(e);
+  }
 
-  if (errcode == no_error) {
+  if (found) {
     VERB2("Re-register version %d of message '%s' (same payload, ignored).",
          version, name);
     xbt_assert3(!gras_datadesc_type_cmp(msgtype->ctn_type, payload),
@@ -154,7 +163,7 @@ gras_msgtype_declare_v(const char           *name,
     return ; /* do really ignore it */
 
   }
-  xbt_assert_error(mismatch_error); /* expect this error */
+
   VERB3("Register version %d of message '%s' (payload: %s).", 
        version, name, gras_datadesc_get_name(payload));    
 
@@ -177,102 +186,17 @@ gras_msgtype_t gras_msgtype_by_name (const char *name) {
 gras_msgtype_t gras_msgtype_by_namev(const char      *name,
                                     short int        version) {
   gras_msgtype_t res;
-
-  xbt_error_t errcode;
   char *namev = make_namev(name,version); 
 
-  errcode = xbt_set_get_by_name(_gras_msgtype_set, namev,
-                                (xbt_set_elm_t*)&res);
-  if (errcode != no_error)
-    res = NULL;
-  if (!res) 
-     WARN1("msgtype_by_name(%s) returns NULL",namev);
+  res = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set, namev);
   if (name != namev) 
     free(namev);
   
   return res;
 }
-
-/** \brief Send the data pointed by \a payload as a message of type
- * \a msgtype to the peer \a sock */
-xbt_error_t
-gras_msg_send(gras_socket_t   sock,
-             gras_msgtype_t  msgtype,
-             void           *payload) {
-
-  xbt_error_t errcode;
-  static gras_datadesc_type_t string_type=NULL;
-
-  if (!msgtype)
-    RAISE0(mismatch_error,
-          "Cannot send the NULL message (did msgtype_by_name fail?)");
-
-  if (!string_type) {
-    string_type = gras_datadesc_by_name("string");
-    xbt_assert(string_type);
-  }
-
-  DEBUG3("send '%s' to %s:%d", msgtype->name, 
-        gras_socket_peer_name(sock),gras_socket_peer_port(sock));
-  TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
-
-  TRY(gras_datadesc_send(sock, string_type,   &msgtype->name));
-  TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
-  TRY(gras_trp_flush(sock));
-
-  return no_error;
-}
-/*
- * receive the next message on the given socket.  
- */
-xbt_error_t
-gras_msg_recv(gras_socket_t    sock,
-             gras_msgtype_t  *msgtype,
-             void           **payload,
-             int             *payload_size) {
-
-  xbt_error_t errcode;
-  static gras_datadesc_type_t string_type=NULL;
-  char header[6];
-  int cpt;
-  int r_arch;
-  char *msg_name=NULL;
-
-  if (!string_type) {
-    string_type=gras_datadesc_by_name("string");
-    xbt_assert(string_type);
-  }
-  
-  TRY(gras_trp_chunk_recv(sock, header, 6));
-  for (cpt=0; cpt<4; cpt++)
-    if (header[cpt] != GRAS_header[cpt])
-      RAISE0(mismatch_error,"Incoming bytes do not look like a GRAS message");
-  if (header[4] != GRAS_header[4]) 
-    RAISE2(mismatch_error,"GRAS protocol mismatch (got %d, use %d)",
-          (int)header[4], (int)GRAS_header[4]);
-  r_arch = (int)header[5];
-  DEBUG2("Handle an incoming message using protocol %d (remote is %s)",
-        (int)header[4],gras_datadesc_arch_name(r_arch));
-
-  TRY(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
-  errcode = xbt_set_get_by_name(_gras_msgtype_set,
-                                msg_name,(xbt_set_elm_t*)msgtype);
-  if (errcode != no_error)
-    RAISE2(errcode,
-          "Got error %s while retrieving the type associated to messages '%s'",
-          xbt_error_name(errcode),msg_name);
-  /* FIXME: Survive unknown messages */
-  free(msg_name);
-
-  *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
-  xbt_assert2(*payload_size > 0,
-              "%s %s",
-              "Dynamic array as payload is forbided for now (FIXME?).",
-              "Reference to dynamic array is allowed.");
-  *payload = xbt_malloc(*payload_size);
-  TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
-
-  return no_error;
+/** @brief retrive an existing message type from its name and version. */
+gras_msgtype_t gras_msgtype_by_id(int id) {
+  return (gras_msgtype_t)xbt_set_get_by_id(_gras_msgtype_set, id);
 }
 
 /** \brief Waits for a message to come in over a given socket. 
@@ -286,7 +210,7 @@ gras_msg_recv(gras_socket_t    sock,
  * Every message of another type received before the one waited will be queued
  * and used by subsequent call to this function or gras_msg_handle().
  */
-xbt_error_t
+void
 gras_msg_wait(double           timeout,    
              gras_msgtype_t   msgt_want,
              gras_socket_t   *expeditor,
@@ -295,58 +219,59 @@ gras_msg_wait(double           timeout,
   gras_msgtype_t msgt_got;
   void *payload_got;
   int payload_size_got;
-  xbt_error_t errcode;
   double start, now;
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
   int cpt;
   s_gras_msg_t msg;
+  gras_socket_t expeditor_res = NULL;
   
-  *expeditor = NULL;
   payload_got = NULL;
 
-  if (!msgt_want)
-    RAISE0(mismatch_error,
-          "Cannot wait for the NULL message (did msgtype_by_name fail?)");
+  xbt_assert0(msgt_want,"Cannot wait for the NULL message");
 
-  VERB1("Waiting for message %s",msgt_want->name);
+  VERB1("Waiting for message '%s'",msgt_want->name);
 
   start = now = gras_os_time();
 
   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
     if (msg.type->code == msgt_want->code) {
-      *expeditor = msg.expeditor;
+      if (expeditor)
+        *expeditor = msg.expeditor;
       memcpy(payload, msg.payload, msg.payload_size);
       free(msg.payload);
       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
       VERB0("The waited message was queued");
-      return no_error;
+      return;
     }
   }
 
   while (1) {
-    TRY(gras_trp_select(timeout - now + start, expeditor));
-    TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
+    expeditor_res = gras_trp_select(timeout - now + start);
+    gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got);
     if (msgt_got->code == msgt_want->code) {
+      if (expeditor)
+       *expeditor=expeditor_res;
       memcpy(payload, payload_got, payload_size_got);
       free(payload_got);
       VERB0("Got waited message");
-      return no_error;
+      return;
     }
 
     /* not expected msg type. Queue it for later */
-    msg.expeditor = *expeditor;
-    msg.type      =  msgt_got;
-    msg.payload   =  payload;
+    msg.expeditor = expeditor_res;
+    msg.type      = msgt_got;
+    msg.payload   = payload;
     msg.payload_size = payload_size_got;
     xbt_dynar_push(pd->msg_queue,&msg);
     
     now=gras_os_time();
     if (now - start + 0.001 < timeout) {
-      RAISE1(timeout_error,"Timeout while waiting for msg %s",msgt_want->name);
+      THROW1(timeout_error,  now-start+0.001-timeout,
+            "Timeout while waiting for msg %s",msgt_want->name);
     }
   }
 
-  RAISE_IMPOSSIBLE;
+  THROW_IMPOSSIBLE;
 }
 
 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
@@ -356,16 +281,15 @@ gras_msg_wait(double           timeout,
  *
  * Messages are passed to the callbacks.
  */
-xbt_error_t 
+void
 gras_msg_handle(double timeOut) {
   
   double          untiltimer;
    
-  xbt_error_t    errcode;
   int             cpt;
 
   s_gras_msg_t    msg;
-  gras_socket_t   expeditor;
+  gras_socket_t   expeditor=NULL;
   void           *payload=NULL;
   int             payload_size;
   gras_msgtype_t  msgtype;
@@ -374,7 +298,8 @@ gras_msg_handle(double timeOut) {
   gras_cblist_t  *list=NULL;
   gras_msg_cb_t       cb;
    
-  int timerexpected;
+  int timerexpected, timeouted;
+  xbt_ex_t e;
 
   VERB1("Handling message within the next %.2fs",timeOut);
   
@@ -382,7 +307,7 @@ gras_msg_handle(double timeOut) {
   DEBUG2("[%.0f] Next timer in %f sec", gras_os_time(), untiltimer);
   if (untiltimer == 0.0) {
      /* A timer was already elapsed and handled */
-     return no_error;
+     return;
   }
   if (untiltimer != -1.0) {
      timerexpected = 1;
@@ -392,38 +317,53 @@ gras_msg_handle(double timeOut) {
   }
    
   /* get a message (from the queue or from the net) */
+  timeouted = 0;
   if (xbt_dynar_length(pd->msg_queue)) {
     DEBUG0("Get a message from the queue");
     xbt_dynar_shift(pd->msg_queue,&msg);
     expeditor = msg.expeditor;
     msgtype   = msg.type;
     payload   = msg.payload;
-    errcode   = no_error;
   } else {
-    errcode = gras_trp_select(timeOut, &expeditor);
-    if (errcode != no_error && errcode != timeout_error)
-       return errcode;
-    if (errcode != timeout_error)
-       TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+    TRY {
+      expeditor = gras_trp_select(timeOut);
+    } CATCH(e) {
+      if (e.category != timeout_error)
+       RETHROW;
+      xbt_ex_free(e);
+      timeouted = 1;
+    }
+
+    if (!timeouted) {
+      TRY {
+       gras_msg_recv(expeditor, &msgtype, &payload, &payload_size);
+      } CATCH(e) {
+       RETHROW1("Error caught  while receiving a message on select()ed socket %p: %s",
+                expeditor);
+      }
+    }
   }
 
-  if (errcode == timeout_error ) {
+  if (timeouted) {
      if (timerexpected) {
          
        /* A timer elapsed before the arrival of any message even if we select()ed a bit */
        untiltimer = gras_msg_timer_handle();
        if (untiltimer == 0.0) {
-          return no_error;
+         /* we served a timer, we're done */
+         return;
        } else {
-          xbt_assert1(untiltimer>0, "Negative timer (%f). I'm puzzeled", untiltimer);
-          ERROR1("No timer elapsed, in contrary to expectations (next in %f sec)",
+          xbt_assert1(untiltimer>0, "Negative timer (%f). I'm 'puzzeled'", untiltimer);
+          WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
+                 untiltimer);
+          THROW1(timeout_error,0,
+                 "No timer elapsed, in contrary to expectations (next in %f sec)",
                  untiltimer);
-          return timeout_error;
        }
        
      } else {
        /* select timeouted, and no timer elapsed. Nothing to do */
-       return timeout_error;
+       THROW0(timeout_error, 0, "No new message or timer");
      }
      
   }
@@ -440,7 +380,7 @@ gras_msg_handle(double timeOut) {
     INFO1("No callback for the incomming '%s' message. Discarded.", 
          msgtype->name);
     WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
-    return no_error;
+    return;
   }
   
   xbt_dynar_foreach(list->cbs,cpt,cb) { 
@@ -449,13 +389,13 @@ gras_msg_handle(double timeOut) {
     if ((*cb)(expeditor,payload)) {
       /* cb handled the message */
       free(payload);
-      return no_error;
+      return;
     }
   }
 
-  INFO1("Message '%s' refused by all registered callbacks", msgtype->name);
-  WARN0("FIXME: gras_datadesc_free not implemented => leaking the payload");
-  return mismatch_error;
+  /* FIXME: gras_datadesc_free not implemented => leaking the payload */
+  THROW1(mismatch_error,0,
+        "Message '%s' refused by all registered callbacks", msgtype->name);
 }
 
 void
@@ -480,7 +420,7 @@ gras_cb_register(gras_msgtype_t msgtype,
   gras_cblist_t *list=NULL;
   int cpt;
 
-  DEBUG2("Register %p as callback to %s",cb,msgtype->name);
+  DEBUG2("Register %p as callback to '%s'",cb,msgtype->name);
 
   /* search the list of cb for this message on this host (creating if NULL) */
   xbt_dynar_foreach(pd->cbl_list,cpt,list) {