Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New function: gras_msg_wait_ext (for a finer control of accepted messages); introduce...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 25 Oct 2005 19:59:11 +0000 (19:59 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 25 Oct 2005 19:59:11 +0000 (19:59 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1835 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/msg.c
src/gras/Msg/msg_private.h
src/gras/Msg/rl_msg.c
src/gras/Msg/sg_msg.c

index 6281d39..a27a327 100644 (file)
@@ -205,30 +205,32 @@ gras_msgtype_t gras_msgtype_by_id(int id) {
 /** \brief Waits for a message to come in over a given socket. 
  *
  * @param timeout: How long should we wait for this message.
- * @param msgt_want: type of awaited msg
- * @param[out] expeditor: where to create a socket to answer the incomming message
- * @param[out] payload: where to write the payload of the incomming message
- * @return the error code (or no_error).
+ * @param msgt_want: type of awaited msg (or NULL if I'm enclined to accept any message)
+ * @param expe_want: awaited expeditot (match on hostname, not port; NULL if not relevant)
+ * @param payl_filter: function returning true or false when passed a payload. Messages for which it returns false are not selected. (NULL if not relevant)
+ * @param filter_ctx: context passed as second argument of the filter (a pattern to match?)
+ * @param[out] msgt_got: where to write the descriptor of the message we got
+ * @param[out] expe_got: where to create a socket to answer the incomming message
+ * @param[out] payl_got: where to write the payload of the incomming message
  *
  * Every message of another type received before the one waited will be queued
  * and used by subsequent call to this function or gras_msg_handle().
  */
+
 void
-gras_msg_wait(double           timeout,    
-             gras_msgtype_t   msgt_want,
-             gras_socket_t   *expeditor,
-             void            *payload) {
+gras_msg_wait_ext(double           timeout,    
+                 gras_msgtype_t   msgt_want,
+                 gras_socket_t    expe_want,
+                 int_f_pvoid_pvoid_t payl_filter,
+                 void               *filter_ctx, 
+                 gras_msgtype_t  *msgt_got,
+                 gras_socket_t   *expe_got,
+                 void            *payl_got) {
 
-  gras_msgtype_t msgt_got;
-  void *payload_got;
-  int payload_size_got;
+  s_gras_msg_t msg;
   double start, now;
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
   int cpt;
-  s_gras_msg_t msg;
-  gras_socket_t expeditor_res = NULL;
-  
-  payload_got = NULL;
 
   xbt_assert0(msgt_want,"Cannot wait for the NULL message");
 
@@ -237,11 +239,18 @@ gras_msg_wait(double           timeout,
   start = now = gras_os_time();
 
   xbt_dynar_foreach(pd->msg_queue,cpt,msg){
-    if (msg.type->code == msgt_want->code) {
-      if (expeditor)
-        *expeditor = msg.expeditor;
-      memcpy(payload, msg.payload, msg.payload_size);
-      free(msg.payload);
+    if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
+        && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
+                                    gras_socket_peer_name(expe_want))))
+        && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
+
+      if (expe_got)
+        *expe_got = msg.expe;
+      if (msgt_got)
+       *msgt_got = msg.type;
+      if (payl_got) 
+       memcpy(payl_got, msg.payl, msg.payl_size);
+      free(msg.payl);
       xbt_dynar_cursor_rm(pd->msg_queue, &cpt);
       VERB0("The waited message was queued");
       return;
@@ -249,22 +258,27 @@ gras_msg_wait(double           timeout,
   }
 
   while (1) {
-    expeditor_res = gras_trp_select(timeout - now + start);
-    gras_msg_recv(expeditor_res, &msgt_got, &payload_got, &payload_size_got);
-    if (msgt_got->code == msgt_want->code) {
-      if (expeditor)
-       *expeditor=expeditor_res;
-      memcpy(payload, payload_got, payload_size_got);
-      free(payload_got);
-      VERB0("Got waited message");
+    memset(&msg,sizeof(msg),0);
+
+    msg.expe = gras_trp_select(timeout - now + start);
+    gras_msg_recv(msg.expe, &msg);
+
+    if ( (   !msgt_want || (msg.type->code == msgt_want->code)) 
+        && (!expe_want || (!strcmp( gras_socket_peer_name(msg.expe),
+                                    gras_socket_peer_name(expe_want))))
+        && (!payl_filter || payl_filter(msg.payl,filter_ctx))) {
+
+      if (expe_got)
+       *expe_got=msg.expe;
+      if (msgt_got)
+       *msgt_got = msg.type;
+      if (payl_got) 
+       memcpy(payl_got, msg.payl, msg.payl_size);
+      free(msg.payl);
       return;
     }
 
     /* not expected msg type. Queue it for later */
-    msg.expeditor = expeditor_res;
-    msg.type      = msgt_got;
-    msg.payload   = payload;
-    msg.payload_size = payload_size_got;
     xbt_dynar_push(pd->msg_queue,&msg);
     
     now=gras_os_time();
@@ -276,6 +290,38 @@ gras_msg_wait(double           timeout,
 
   THROW_IMPOSSIBLE;
 }
+/** \brief Waits for a message to come in over a given socket. 
+ *
+ * @param timeout: How long should we wait for this message.
+ * @param msgt_want: type of awaited msg
+ * @param[out] expeditor: where to create a socket to answer the incomming message
+ * @param[out] payload: where to write the payload of the incomming message
+ * @return the error code (or no_error).
+ *
+ * Every message of another type received before the one waited will be queued
+ * and used by subsequent call to this function or gras_msg_handle().
+ */
+void
+gras_msg_wait(double           timeout,    
+             gras_msgtype_t   msgt_want,
+             gras_socket_t   *expeditor,
+             void            *payload) {
+
+  return gras_msg_wait_ext(timeout,
+                          msgt_want, NULL,      NULL, NULL,
+                          NULL,      expeditor, payload);
+}
+
+
+/** \brief Send the data pointed by \a payload as a message of type
+ * \a msgtype to the peer \a sock */
+void
+gras_msg_send(gras_socket_t   sock,
+             gras_msgtype_t  msgtype,
+             void           *payload) {
+
+  gras_msg_send_ext(sock, e_gras_msg_kind_oneway, msgtype, payload);
+}
 
 /** @brief Handle an incomming message or timer (or wait up to \a timeOut seconds)
  *
@@ -294,8 +340,7 @@ gras_msg_handle(double timeOut) {
   s_gras_msg_t    msg;
   gras_socket_t   expeditor=NULL;
   void           *payload=NULL;
-  int             payload_size;
-  gras_msgtype_t  msgtype;
+  gras_msgtype_t  msgtype=NULL;
 
   gras_msg_procdata_t pd=(gras_msg_procdata_t)gras_libdata_by_id(gras_msg_libdata_id);
   gras_cblist_t  *list=NULL;
@@ -324,9 +369,9 @@ gras_msg_handle(double timeOut) {
   if (xbt_dynar_length(pd->msg_queue)) {
     DEBUG0("Get a message from the queue");
     xbt_dynar_shift(pd->msg_queue,&msg);
-    expeditor = msg.expeditor;
+    expeditor = msg.expe;
     msgtype   = msg.type;
-    payload   = msg.payload;
+    payload   = msg.payl;
   } else {
     TRY {
       expeditor = gras_trp_select(timeOut);
@@ -339,9 +384,12 @@ gras_msg_handle(double timeOut) {
 
     if (!timeouted) {
       TRY {
-       gras_msg_recv(expeditor, &msgtype, &payload, &payload_size);
+       /* FIXME: if not the right kind, queue it and recall ourself or goto >:-) */
+       gras_msg_recv(expeditor, &msg);
+       msgtype   = msg.type;
+       payload   = msg.payl;
       } CATCH(e) {
-       RETHROW1("Error caught  while receiving a message on select()ed socket %p: %s",
+       RETHROW1("Error caught while receiving a message on select()ed socket %p: %s",
                 expeditor);
       }
     }
index 20f28c1..a144580 100644 (file)
@@ -32,12 +32,28 @@ extern char _GRAS_header[6];
 
 extern int gras_msg_libdata_id; /* The identifier of our libdata */
  
+typedef enum {
+  e_gras_msg_kind_unknown = 0,
+  e_gras_msg_kind_oneway  = 1
+  /* future:
+       method call (answer expected; sessionID attached)
+       successful return (usual datatype attached, with sessionID)
+       error return (payload = exception)
+       [+ call cancel, and others]
+     even after: 
+       forwarding request and other application level routing stuff
+       group communication
+  */
+  
+} e_gras_msg_kind_t;
 /** @brief Message instance */
 typedef struct {
-  gras_socket_t   expeditor;
-  gras_msgtype_t  type;
-  void           *payload;
-  int             payload_size;
+    gras_socket_t   expe;
+  e_gras_msg_kind_t kind;
+    gras_msgtype_t  type;
+    void           *payl;
+    int             payl_size;
 } s_gras_msg_t, *gras_msg_t;
 
 /**
@@ -60,10 +76,13 @@ extern xbt_set_t _gras_msgtype_set; /* of gras_msgtype_t */
 void gras_msgtype_free(void *msgtype);
 
 
-void gras_msg_recv(gras_socket_t    sock,
-                  gras_msgtype_t  *msgtype,
-                  void           **payload,
-                  int             *payload_size);
+/* functions to extract msg from socket or put it on wire (depend RL vs SG) */
+void gras_msg_recv(gras_socket_t   sock,
+                  gras_msg_t      msg/*OUT*/);
+void gras_msg_send_ext(gras_socket_t   sock,
+                    e_gras_msg_kind_t kind,
+                      gras_msgtype_t  msgtype,
+                      void           *payload);
 
 /**
  * gras_cblist_t:
index 61405d0..bfbd5e4 100644 (file)
 XBT_LOG_EXTERNAL_CATEGORY(gras_msg);
 XBT_LOG_DEFAULT_CATEGORY(gras_msg);
 
-/** \brief Send the data pointed by \a payload as a message of type
- * \a msgtype to the peer \a sock */
-void
-gras_msg_send(gras_socket_t   sock,
-             gras_msgtype_t  msgtype,
-             void           *payload) {
+void gras_msg_send_ext(gras_socket_t   sock,
+                    e_gras_msg_kind_t kind,
+                      gras_msgtype_t  msgtype,
+                      void           *payload) {
 
   static gras_datadesc_type_t string_type=NULL;
-
+  char c_kind=(char)kind;
+  
   if (!msgtype)
     THROW0(arg_error,0,
           "Cannot send the NULL message (did msgtype_by_name fail?)");
@@ -37,20 +36,20 @@ gras_msg_send(gras_socket_t   sock,
   DEBUG3("send '%s' to %s:%d", msgtype->name, 
         gras_socket_peer_name(sock),gras_socket_peer_port(sock));
   gras_trp_send(sock, _GRAS_header, 6, 1 /* stable */);
+  gras_trp_send(sock, &c_kind,      1, 1 /* stable */);
 
   gras_datadesc_send(sock, string_type,   &msgtype->name);
   if (msgtype->ctn_type)
     gras_datadesc_send(sock, msgtype->ctn_type, payload);
   gras_trp_flush(sock);
 }
+
 /*
  * receive the next message on the given socket.  
  */
 void
 gras_msg_recv(gras_socket_t    sock,
-             gras_msgtype_t  *msgtype,
-             void           **payload,
-             int             *payload_size) {
+             gras_msg_t       msg) {
 
   xbt_ex_t e;
   static gras_datadesc_type_t string_type=NULL;
@@ -58,6 +57,7 @@ gras_msg_recv(gras_socket_t    sock,
   int cpt;
   int r_arch;
   char *msg_name=NULL;
+  char c_kind;
 
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to receive a message on the measurement socket %p", sock);
@@ -68,6 +68,8 @@ gras_msg_recv(gras_socket_t    sock,
   
   TRY {
     gras_trp_recv(sock, header, 6);
+    gras_trp_recv(sock, &c_kind, 1);
+    msg->kind=(e_gras_msg_kind_t)c_kind;
   } CATCH(e) {
     RETHROW1("Exception caught while trying to get the mesage header on socket %p: %s",
             sock);
@@ -86,7 +88,7 @@ gras_msg_recv(gras_socket_t    sock,
 
   gras_datadesc_recv(sock, string_type, r_arch, &msg_name);
   TRY {
-    *msgtype = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,msg_name);
+    msg->type = (gras_msgtype_t)xbt_set_get_by_name(_gras_msgtype_set,msg_name);
   } CATCH(e) {
     /* FIXME: Survive unknown messages */
     RETHROW1("Exception caught while retrieving the type associated to messages '%s' : %s",
@@ -94,16 +96,16 @@ gras_msg_recv(gras_socket_t    sock,
   }
   free(msg_name);
 
-  if ((*msgtype)->ctn_type) {
-    *payload_size=gras_datadesc_size((*msgtype)->ctn_type);
-    xbt_assert2(*payload_size > 0,
+  if (msg->type->ctn_type) {
+    msg->payl_size=gras_datadesc_size(msg->type->ctn_type);
+    xbt_assert2(msg->payl_size > 0,
                "%s %s",
                "Dynamic array as payload is forbided for now (FIXME?).",
                "Reference to dynamic array is allowed.");
-    *payload = xbt_malloc(*payload_size);
-    gras_datadesc_recv(sock, (*msgtype)->ctn_type, r_arch, *payload);
+    msg->payl = xbt_malloc(msg->payl_size);
+    gras_datadesc_recv(sock, msg->type->ctn_type, r_arch, msg->payl);
   } else {
-    *payload = NULL;
-    *payload_size = 0;
+    msg->payl = NULL;
+    msg->payl_size = 0;
   }
 }
index adaa618..93a9ae1 100644 (file)
 #include "gras/Transport/transport_interface.h" /* gras_trp_chunk_send/recv */
 #include "gras/Transport/transport_private.h" /* sock->data */
 
-/** \brief Send the data pointed by \a payload as a message of type
- * \a msgtype to the peer \a sock */
-void
-gras_msg_send(gras_socket_t   sock,
-             gras_msgtype_t  msgtype,
-             void           *payload) {
+void gras_msg_send_ext(gras_socket_t   sock,
+                    e_gras_msg_kind_t kind,
+                      gras_msgtype_t  msgtype,
+                      void           *payload) {
 
   m_task_t task=NULL;
   gras_trp_sg_sock_data_t *sock_data = (gras_trp_sg_sock_data_t *)sock->data;
@@ -37,13 +35,14 @@ gras_msg_send(gras_socket_t   sock,
   msg->type=msgtype;
 
    
-  msg->payload_size=gras_datadesc_size(msgtype->ctn_type);
-  msg->payload=xbt_malloc(gras_datadesc_size(msgtype->ctn_type));
+  msg->payl_size=gras_datadesc_size(msgtype->ctn_type);
+  msg->payl=xbt_malloc(msg->payl_size);
   if (msgtype->ctn_type)
-    whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,payload,msg->payload);
+    whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,payload,msg->payl);
+  msg->kind = kind;
 
   task=MSG_task_create(msgtype->name,0,
-                      ((double)msg->payload_size)/(1024.0*1024.0),msg);
+                      ((double)whole_payload_size)/(1024.0*1024.0),msg);
 
   if (MSG_task_put(task, sock_data->to_host,sock_data->to_chan) != MSG_OK) 
     THROW0(system_error,0,"Problem during the MSG_task_put");
@@ -54,26 +53,26 @@ gras_msg_send(gras_socket_t   sock,
  */
 void
 gras_msg_recv(gras_socket_t    sock,
-             gras_msgtype_t  *msgtype,
-             void           **payload,
-             int             *payload_size) {
+             gras_msg_t       msg) {
 
   m_task_t task=NULL;
-  gras_msg_t msg;
+  gras_msg_t msg_got;
   gras_trp_procdata_t pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
 
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to receive a message on the measurement socket %p", sock);
 
+  xbt_assert0(msg,"msg is an out parameter of gras_msg_recv...");
+
   if (MSG_task_get(&task, pd->chan) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_get()");
 
-  msg = MSG_task_get_data(task);
-  *msgtype = gras_msgtype_by_id(msg->type->code);
-  *payload = msg->payload;
-  *payload_size = msg->payload_size;
+  msg_got=MSG_task_get_data(task);
+
+  msg_got->expe= msg->expe;
+  memcpy(msg,msg_got,sizeof(s_gras_msg_t));
 
-  free(msg);
+  free(msg_got);
   if (MSG_task_destroy(task) != MSG_OK)
     THROW0(system_error,0,"Error in MSG_task_destroy()");
 }