Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
- Reduce the number of system headers loaded, overload some more system
[simgrid.git] / src / gras / Msg / msg.c
index 85e50e0..df0548b 100644 (file)
@@ -8,9 +8,12 @@
 /* This program is free software; you can redistribute it and/or modify it
    under the terms of the license (GNU LGPL) which comes with this package. */
 
-#include "Msg/msg_private.h"
+#include "gras/Msg/msg_private.h"
+#include "gras/DataDesc/datadesc_interface.h"
+#include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
+#include "gras/Virtu/virtu_interface.h"
 
-GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(msg,GRAS);
+GRAS_LOG_NEW_DEFAULT_SUBCATEGORY(msg,gras,"High level messaging");
 
 gras_set_t *_gras_msgtype_set = NULL;
 static char GRAS_header[6];
@@ -57,8 +60,8 @@ gras_msg_exit(void) {
 void gras_msgtype_free(void *t) {
   gras_msgtype_t *msgtype=(gras_msgtype_t *)t;
   if (msgtype) {
-    free(msgtype->name);
-    free(msgtype);
+    gras_free(msgtype->name);
+    gras_free(msgtype);
   }
 }
 
@@ -74,7 +77,7 @@ static char *make_namev(const char *name, short int ver) {
   if (!ver)
     return (char *)name;
 
-  namev = malloc(strlen(name)+2+3+1);
+  namev = (char*)gras_malloc(strlen(name)+2+3+1);
 
   if (namev) {
       sprintf(namev,"%s_v%d",name,ver);
@@ -129,6 +132,9 @@ gras_msgtype_declare_v(const char            *name,
                 "Message %s re-registred with another payload (%s was %s)",
                 namev,gras_datadesc_get_name(payload),
                 gras_datadesc_get_name(msgtype->ctn_type));
+
+    return no_error; /* do really ignore it */
+
   } else if (errcode == mismatch_error) {
     INFO3("Register version %d of message '%s' (payload: %s).", 
           version, name, gras_datadesc_get_name(payload));    
@@ -136,16 +142,13 @@ gras_msgtype_declare_v(const char            *name,
     return errcode; /* Was expecting for mismatch_error */
   }
 
-  /* create type anyway so that the old type gets removed from here, and
-     hopefully free'd when ref counter gets 0 */
-  if (! (msgtype = malloc(sizeof(gras_msgtype_t))) ) 
+  if (! (msgtype = gras_new(gras_msgtype_t,1)) ) 
     RAISE_MALLOC;
 
   msgtype->name = (namev == name ? strdup(name) : namev);
   msgtype->name_len = strlen(namev);
   msgtype->version = version;
   msgtype->ctn_type = payload;
-  gras_datadesc_addref(payload);
 
   TRY(gras_set_add(_gras_msgtype_set, (gras_set_elm_t*)msgtype,
                   &gras_msgtype_free));
@@ -158,30 +161,29 @@ gras_msgtype_declare_v(const char            *name,
  *
  * Retrieve a datatype description from its name
  */
-gras_error_t 
-gras_msgtype_by_name (const char     *name,
-                     gras_msgtype_t **dst) {
-  return gras_msgtype_by_namev(name,0,dst);
+gras_msgtype_t * gras_msgtype_by_name (const char *name) {
+  return gras_msgtype_by_namev(name,0);
 }
 /**
  * gras_msgtype_by_namev:
  *
  * Retrieve a datatype description from its name and version
  */
-gras_error_t
-gras_msgtype_by_namev(const char      *name,
-                     short int        version,
-                     gras_msgtype_t **dst) {
+gras_msgtype_t * gras_msgtype_by_namev(const char      *name,
+                                      short int        version) {
+  gras_msgtype_t *res;
 
   gras_error_t errcode;
   char *namev = make_namev(name,version); 
 
   errcode = gras_set_get_by_name(_gras_msgtype_set, namev,
-                                (gras_set_elm_t**)dst);
+                                (gras_set_elm_t**)&res);
+  if (errcode != no_error)
+    res = NULL;
   if (name != namev) 
-    free(namev);
-
-  return errcode;
+    gras_free(namev);
+  
+  return res;
 }
 
 /**
@@ -196,6 +198,11 @@ gras_msg_send(gras_socket_t  *sock,
 
   gras_error_t errcode;
   static gras_datadesc_type_t *string_type=NULL;
+
+  if (!msgtype)
+    RAISE0(mismatch_error,
+          "Cannot send the NULL message (did msgtype_by_name fail?)");
+
   if (!string_type) {
     string_type = gras_datadesc_by_name("string");
     gras_assert(string_type);
@@ -207,6 +214,7 @@ gras_msg_send(gras_socket_t  *sock,
 
   TRY(gras_datadesc_send(sock, string_type,   &msgtype->name));
   TRY(gras_datadesc_send(sock, msgtype->ctn_type, payload));
+  TRY(gras_trp_flush(sock));
 
   return no_error;
 }
@@ -245,16 +253,21 @@ gras_msg_recv(gras_socket_t   *sock,
         (int)header[4],gras_datadesc_arch_name(r_arch));
 
   TRY(gras_datadesc_recv(sock, string_type, r_arch, &msg_name));
-  TRY(gras_set_get_by_name(_gras_msgtype_set,
-                          msg_name,(gras_set_elm_t**)msgtype));
-  free(msg_name);
+  errcode = gras_set_get_by_name(_gras_msgtype_set,
+                                msg_name,(gras_set_elm_t**)msgtype);
+  if (errcode != no_error)
+    RAISE2(errcode,
+          "Got error %s while retrieving the type associated to messages '%s'",
+          gras_error_name(errcode),msg_name);
+  /* FIXME: Survive unknown messages */
+  gras_free(msg_name);
 
   *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
   gras_assert2(*payload_size > 0,
               "%s %s",
               "Dynamic array as payload is forbided for now (FIXME?).",
               "Reference to dynamic array is allowed.");
-  *payload = malloc(*payload_size);
+  *payload = gras_malloc(*payload_size);
   TRY(gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload));
 
   return no_error;
@@ -289,6 +302,10 @@ gras_msg_wait(double                 timeout,
   *expeditor = NULL;
   payload_got = NULL;
 
+  if (!msgt_want)
+    RAISE0(mismatch_error,
+          "Cannot wait for the NULL message (did msgtype_by_name fail?)");
+
   VERB1("Waiting for message %s",msgt_want->name);
 
   start = now = gras_os_time();
@@ -297,7 +314,7 @@ gras_msg_wait(double                 timeout,
     if (msg.type->code == msgt_want->code) {
       *expeditor = msg.expeditor;
       memcpy(payload, msg.payload, msg.payload_size);
-      free(msg.payload);
+      gras_free(msg.payload);
       gras_dynar_cursor_rm(pd->msg_queue, &cpt);
       VERB0("The waited message was queued");
       return no_error;
@@ -309,7 +326,7 @@ gras_msg_wait(double                 timeout,
     TRY(gras_msg_recv(*expeditor, &msgt_got, &payload_got, &payload_size_got));
     if (msgt_got->code == msgt_want->code) {
       memcpy(payload, payload_got, payload_size_got);
-      free(payload_got);
+      gras_free(payload_got);
       VERB0("Got waited message");
       return no_error;
     }
@@ -390,7 +407,7 @@ gras_msg_handle(double timeOut) {
          cpt+1,cb,msgtype->name);
     if ((*cb)(expeditor,payload)) {
       /* cb handled the message */
-      free(payload);
+      gras_free(payload);
       return no_error;
     }
   }
@@ -405,7 +422,7 @@ gras_cbl_free(void *data){
   gras_cblist_t *list=*(void**)data;
   if (list) {
     gras_dynar_free(list->cbs);
-    free(list);
+    gras_free(list);
   }
 }
 
@@ -429,7 +446,7 @@ gras_cb_register(gras_msgtype_t *msgtype,
   }
   if (!list) {
     /* First cb? Create room */
-    list = malloc(sizeof(gras_cblist_t));
+    list = gras_new(gras_cblist_t,1);
     if (!list)
       RAISE_MALLOC;