Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Improve documentation and put the examples in another file, so that we can include...
[simgrid.git] / src / gras / Msg / msg.c
index e705dbc..ce2161b 100644 (file)
@@ -219,7 +219,8 @@ gras_msg_send(gras_socket_t   sock,
   TRY(gras_trp_chunk_send(sock, GRAS_header, 6));
 
   TRY(gras_datadesc_send(sock, string_type,   &msgtype->name));
-  TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
+  if (msgtype->ctn_type)
+    TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
   TRY(gras_trp_flush(sock));
 
   return no_error;
@@ -240,12 +241,17 @@ gras_msg_recv(gras_socket_t    sock,
   int r_arch;
   char *msg_name=NULL;
 
+  xbt_assert1(!gras_socket_is_meas(sock), 
+             "Asked to receive a message on the measurement socket %p", sock);
   if (!string_type) {
     string_type=gras_datadesc_by_name("string");
     xbt_assert(string_type);
   }
   
-  TRY(gras_trp_chunk_recv(sock, header, 6));
+  errcode=gras_trp_chunk_recv(sock, header, 6);
+  if (errcode!=no_error)
+    RAISE2(errcode,"Got '%s' while trying to get the mesage header on socket %p",
+          xbt_error_name(errcode),sock);
   for (cpt=0; cpt<4; cpt++)
     if (header[cpt] != GRAS_header[cpt])
       RAISE2(mismatch_error,"Incoming bytes do not look like a GRAS message (header='%.4s' not '%.4s')",header,GRAS_header);
@@ -266,14 +272,18 @@ gras_msg_recv(gras_socket_t    sock,
   /* FIXME: Survive unknown messages */
   free(msg_name);
 
-  *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
-  xbt_assert2(*payload_size > 0,
-              "%s %s",
-              "Dynamic array as payload is forbided for now (FIXME?).",
-              "Reference to dynamic array is allowed.");
-  *payload = xbt_malloc(*payload_size);
-  TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
-
+  if ((*msgtype)->ctn_type) {
+    *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
+    xbt_assert2(*payload_size > 0,
+               "%s %s",
+               "Dynamic array as payload is forbided for now (FIXME?).",
+               "Reference to dynamic array is allowed.");
+    *payload = xbt_malloc(*payload_size);
+    TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
+  } else {
+    *payload = NULL;
+    *payload_size = 0;
+  }
   return no_error;
 }
 
@@ -302,8 +312,8 @@ gras_msg_wait(double           timeout,
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_get("gras_msg");
   int cpt;
   s_gras_msg_t msg;
+  gras_socket_t expeditor_res = NULL;
   
-  *expeditor = NULL;
   payload_got = NULL;
 
   if (!msgt_want)
@@ -316,7 +326,8 @@ gras_msg_wait(double           timeout,
 
   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
     if (msg.type->code == msgt_want->code) {
-      *expeditor = msg.expeditor;
+      if (expeditor)
+        *expeditor = msg.expeditor;
       memcpy(payload, msg.payload, msg.payload_size);
       free(msg.payload);
       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
@@ -326,9 +337,11 @@ gras_msg_wait(double           timeout,
   }
 
   while (1) {
-    TRY(gras_trp_select(timeout - now + start, expeditor));
-    TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
+    TRY(gras_trp_select(timeout - now + start, &expeditor_res));
+    TRY(gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got));
     if (msgt_got->code == msgt_want->code) {
+      if (expeditor)
+       *expeditor=expeditor_res;
       memcpy(payload, payload_got, payload_size_got);
       free(payload_got);
       VERB0("Got waited message");
@@ -336,9 +349,9 @@ gras_msg_wait(double           timeout,
     }
 
     /* not expected msg type. Queue it for later */
-    msg.expeditor = *expeditor;
-    msg.type      =  msgt_got;
-    msg.payload   =  payload;
+    msg.expeditor = expeditor_res;
+    msg.type      = msgt_got;
+    msg.payload   = payload;
     msg.payload_size = payload_size_got;
     xbt_dynar_push(pd->msg_queue,&msg);
     
@@ -405,8 +418,12 @@ gras_msg_handle(double timeOut) {
     errcode = gras_trp_select(timeOut, &expeditor);
     if (errcode != no_error && errcode != timeout_error)
        return errcode;
-    if (errcode != timeout_error)
-       TRY(gras_msg_recv(expeditor, &msgtype, &payload, &payload_size));
+    if (errcode == no_error) {
+       errcode = gras_msg_recv(expeditor, &msgtype, &payload, &payload_size);
+       if (errcode != no_error) 
+         RAISE2(errcode, "Error '%s' while receiving a message on select()ed socket %p",
+                xbt_error_name(errcode),expeditor);
+    }
   }
 
   if (errcode == timeout_error ) {
@@ -418,7 +435,7 @@ gras_msg_handle(double timeOut) {
           return no_error;
        } else {
           xbt_assert1(untiltimer>0, "Negative timer (%f). I'm puzzeled", untiltimer);
-          ERROR1("No timer elapsed, in contrary to expectations (next in %f sec)",
+          WARN1("No timer elapsed, in contrary to expectations (next in %f sec)",
                  untiltimer);
           return timeout_error;
        }