Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
various little cleanups in the gras/sg code. Mainly reindentation and more informativ...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 12 Jul 2007 10:36:49 +0000 (10:36 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 12 Jul 2007 10:36:49 +0000 (10:36 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3743 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/sg_msg.c
src/gras/Transport/sg_transport.c
src/gras/Transport/transport_interface.h
src/gras/Transport/transport_plugin_sg.c
src/gras/Transport/transport_private.h
src/gras/Virtu/sg_process.c
src/gras/Virtu/virtu_sg.h

index afd8d34..2a9c84a 100644 (file)
@@ -73,36 +73,42 @@ void gras_msg_send_ext(gras_socket_t   sock,
       whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
                                              payload, msg->payl);
   }
       whole_payload_size = gras_datadesc_copy(msgtype->ctn_type,
                                              payload, msg->payl);
   }
-       /* put message on msg_queue */
-       msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
-       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
-       
-       /* wake-up the receiver */
-       trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
-       xbt_fifo_push(trp_remote_proc->active_socket,sock);
-
-       SIMIX_cond_signal(trp_remote_proc->cond);
-
-       /* wait for the receiver */
-       SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-
-       /* creates simix action and waits its ends, waits in the sender host condition*/
-       act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,msgtype->name, (double)whole_payload_size, -1);
-       SIMIX_register_action_to_condition(act,sock_data->cond);
-       SIMIX_register_condition_to_action(act,sock_data->cond);
-
+  /* put message on msg_queue */
+  msg_remote_proc = (gras_msg_procdata_t)
+    gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
+  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue,msg);
+  
+  /* wake-up the receiver */
+  trp_remote_proc = (gras_trp_procdata_t)
+    gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
+
+  xbt_fifo_push(trp_remote_proc->msg_selectable_sockets,sock);  
+  SIMIX_cond_signal(trp_remote_proc->msg_select_cond);
+  
+  /* wait for the receiver */
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+  
+  /* creates simix action and waits its ends, waits in the sender host
+     condition*/
+  act = SIMIX_action_communicate(SIMIX_host_self(), 
+                                sock_data->to_host,msgtype->name,
+                                (double)whole_payload_size, -1);
+  SIMIX_register_action_to_condition(act,sock_data->cond);
+  SIMIX_register_condition_to_action(act,sock_data->cond);
+  
   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
   VERB5("Sending to %s(%s) a message type '%s' kind '%s' ID %lu",
-       SIMIX_host_get_name(sock_data->to_host),SIMIX_process_get_name(sock_data->to_process),
-       msg->type->name,e_gras_msg_kind_names[msg->kind],       msg->ID);
+       SIMIX_host_get_name(sock_data->to_host),
+       SIMIX_process_get_name(sock_data->to_process),
+       msg->type->name,e_gras_msg_kind_names[msg->kind], msg->ID);
        
        
-       SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
-       /* error treatmeant */
-
-       /* cleanup structures */
-       SIMIX_action_destroy(act);
-       SIMIX_mutex_unlock(sock_data->mutex);
+  SIMIX_cond_wait(sock_data->cond, sock_data->mutex);
+  /* error treatmeant (FIXME)*/
 
 
-       VERB0("Message sent");
+  /* cleanup structures */
+  SIMIX_action_destroy(act);
+  SIMIX_mutex_unlock(sock_data->mutex);
+  
+  VERB0("Message sent");
 
 }
 /*
 
 }
 /*
@@ -111,42 +117,45 @@ void gras_msg_send_ext(gras_socket_t   sock,
 void
 gras_msg_recv(gras_socket_t    sock,
              gras_msg_t       msg) {
 void
 gras_msg_recv(gras_socket_t    sock,
              gras_msg_t       msg) {
-
-       gras_trp_sg_sock_data_t *sock_data; 
-       gras_trp_sg_sock_data_t *remote_sock_data; 
-       gras_hostdata_t *remote_hd;
+  
+  gras_trp_sg_sock_data_t *sock_data; 
+  gras_trp_sg_sock_data_t *remote_sock_data; 
+  gras_hostdata_t *remote_hd;
   gras_msg_t msg_got;
   gras_msg_t msg_got;
-       gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+  gras_msg_procdata_t msg_procdata = 
+    (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
 
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to receive a message on the measurement socket %p", sock);
 
   xbt_assert0(msg,"msg is an out parameter of gras_msg_recv...");
 
   xbt_assert1(!gras_socket_is_meas(sock), 
              "Asked to receive a message on the measurement socket %p", sock);
 
   xbt_assert0(msg,"msg is an out parameter of gras_msg_recv...");
-
-       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-       remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
-       DEBUG3("Remote host %s, Remote Port: %d Local port %d", SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
-       remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
-
-       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
-               THROW_IMPOSSIBLE;
-       }
-       DEBUG1("Size msg_to_receive buffer: %d", xbt_fifo_size(msg_procdata->msg_to_receive_queue));
+  
+  sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+  remote_sock_data = ((gras_trp_sg_sock_data_t *)sock->data)->to_socket->data;
+  DEBUG3("Remote host %s, Remote Port: %d Local port %d", 
+        SIMIX_host_get_name(sock_data->to_host), sock->peer_port, sock->port);
+  remote_hd = (gras_hostdata_t *)SIMIX_host_get_data(sock_data->to_host);
+  
+  if (xbt_fifo_size(msg_procdata->msg_to_receive_queue) == 0 ) {
+    THROW_IMPOSSIBLE;
+  }
+  DEBUG1("Size msg_to_receive buffer: %d", 
+        xbt_fifo_size(msg_procdata->msg_to_receive_queue));
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue);
-
-       SIMIX_mutex_lock(remote_sock_data->mutex);
-/* ok, I'm here, you can continuate the communication */
-       SIMIX_cond_signal(remote_sock_data->cond);
-
-/* wait for communication end */
-       SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
-
-       msg_got->expe= msg->expe;
+  
+  SIMIX_mutex_lock(remote_sock_data->mutex);
+  /* ok, I'm here, you can continuate the communication */
+  SIMIX_cond_signal(remote_sock_data->cond);
+  
+  /* wait for communication end */
+  SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
+  
+  msg_got->expe= msg->expe;
   memcpy(msg,msg_got,sizeof(s_gras_msg_t));
   memcpy(msg,msg_got,sizeof(s_gras_msg_t));
-       xbt_free(msg_got);
-       SIMIX_mutex_unlock(remote_sock_data->mutex);
-
-       VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s",
+  xbt_free(msg_got);
+  SIMIX_mutex_unlock(remote_sock_data->mutex);
+  
+  VERB3("Received a message type '%s' kind '%s' ID %lu",// from %s",
        msg->type->name,
        e_gras_msg_kind_names[msg->kind],
        msg->ID);
        msg->type->name,
        e_gras_msg_kind_names[msg->kind],
        msg->ID);
index a767070..7299205 100644 (file)
@@ -28,94 +28,101 @@ gras_socket_t _gras_lastly_selected_socket = NULL;
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
 gras_socket_t gras_trp_select(double timeout) {
  * if timeout>0 and no message there, wait at most that amount of time before giving up.
  */
 gras_socket_t gras_trp_select(double timeout) {
-       gras_socket_t res;
-       gras_trp_procdata_t pd = (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
-       gras_trp_sg_sock_data_t *sockdata;
-       gras_trp_plugin_t trp;
-       gras_socket_t active_socket;
-       gras_socket_t sock_iter; /* iterating over all sockets */
-       int cursor;
-
-       DEBUG0("Trying to get the lock pd, trp_select");
-       SIMIX_mutex_lock(pd->mutex);
-       DEBUG3("select on %s@%s with timeout=%f",
-                       SIMIX_process_get_name(SIMIX_process_self()),
-                       SIMIX_host_get_name(SIMIX_host_self()),
-                       timeout);
-
-       if (xbt_fifo_size(pd->active_socket) == 0) {
-       /* message didn't arrive yet, wait */
-               SIMIX_cond_wait_timeout(pd->cond,pd->mutex,timeout);
-       }
-
-       if (xbt_fifo_size(pd->active_socket) == 0) {
-               DEBUG0("TIMEOUT");
-               SIMIX_mutex_unlock(pd->mutex);
-               THROW0(timeout_error,0,"Timeout");
-       }
-       active_socket = xbt_fifo_shift(pd->active_socket);
-       
-       /* Ok, got something. Open a socket back to the expeditor */
-
-       /* Try to reuse an already openned socket to that expeditor */
-       DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets));
-       xbt_dynar_foreach(pd->sockets,cursor,sock_iter) {
-               DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
-
-               if (sock_iter->meas || !sock_iter->outgoing)
-                       continue;
-                       /*
-               if ((sock_iter->peer_port == active_socket->port) && 
-                               (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process))) {
-                               */
-               if ( (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_socket == active_socket) && (((gras_trp_sg_sock_data_t*)sock_iter->data)->to_host == SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)active_socket->data)->from_process)) ) {
-                       SIMIX_mutex_unlock(pd->mutex);
-                       return sock_iter;
-               }
-       }
-
-       /* Socket to expeditor not created yet */
-       DEBUG0("Create a socket to the expeditor");
-
-       trp = gras_trp_plugin_get_by_name("sg");
-
-       gras_trp_socket_new(1,&res);
-       res->plugin   = trp;
-
-       res->incoming  = 1;
-       res->outgoing  = 1;
-       res->accepting = 0;
-       res->sd        = -1;
-
-       res->port = -1;
-
-       /* initialize the ports */
-       //res->peer_port = active_socket->port;
-       res->port = active_socket->peer_port;
-
-       /* create sockdata */
-       sockdata = xbt_new(gras_trp_sg_sock_data_t,1);
-       sockdata->from_process = SIMIX_process_self();
-       sockdata->to_process   = ((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process;
+  gras_socket_t res;
+  gras_trp_procdata_t pd = 
+    (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
+  gras_trp_sg_sock_data_t *sockdata;
+  gras_trp_plugin_t trp;
+  gras_socket_t active_socket;
+  gras_trp_sg_sock_data_t *active_socket_data;
+  gras_socket_t sock_iter; /* iterating over all sockets */
+  int cursor;
+
+  DEBUG0("Trying to get the lock pd, trp_select");
+  SIMIX_mutex_lock(pd->msg_select_mutex);
+  DEBUG3("select on %s@%s with timeout=%f",
+        SIMIX_process_get_name(SIMIX_process_self()),
+        SIMIX_host_get_name(SIMIX_host_self()),
+        timeout);
+
+  if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+    /* message didn't arrive yet, wait */
+    SIMIX_cond_wait_timeout(pd->msg_select_cond,pd->msg_select_mutex,timeout);
+  }
+
+  if (xbt_fifo_size(pd->msg_selectable_sockets) == 0) {
+    DEBUG0("TIMEOUT");
+    SIMIX_mutex_unlock(pd->msg_select_mutex);
+    THROW0(timeout_error,0,"Timeout");
+  }
+  active_socket = xbt_fifo_shift(pd->msg_selectable_sockets);
+  active_socket_data = (gras_trp_sg_sock_data_t*)active_socket->data;
+  
+  /* Ok, got something. Open a socket back to the expeditor */
+
+  /* Try to reuse an already openned socket to that expeditor */
+  DEBUG1("Open sockets size %lu",xbt_dynar_length(pd->sockets));
+  xbt_dynar_foreach(pd->sockets,cursor,sock_iter) {
+    gras_trp_sg_sock_data_t *sock_data;
+    DEBUG1("Consider %p as outgoing socket to expeditor",sock_iter);
+    
+    if (sock_iter->meas || !sock_iter->outgoing)
+      continue;
+    sock_data = ((gras_trp_sg_sock_data_t*)sock_iter->data);
+
+    if ( (sock_data->to_socket == active_socket) && 
+        (sock_data->to_host == SIMIX_process_get_host(active_socket_data->from_process)) ) {
+      SIMIX_mutex_unlock(pd->msg_select_mutex);
+      return sock_iter;
+    }
+  }
+
+  /* Socket to expeditor not created yet */
+  DEBUG0("Create a socket to the expeditor");
+
+  trp = gras_trp_plugin_get_by_name("sg");
+
+  gras_trp_socket_new(1,&res);
+  res->plugin   = trp;
+
+  res->incoming  = 1;
+  res->outgoing  = 1;
+  res->accepting = 0;
+  res->sd        = -1;
+
+  res->port = -1;
+
+  /* initialize the ports */
+  //res->peer_port = active_socket->port;
+  res->port = active_socket->peer_port;
+  
+  /* create sockdata */
+  sockdata = xbt_new(gras_trp_sg_sock_data_t,1);
+  sockdata->from_process = SIMIX_process_self();
+  sockdata->to_process   = active_socket_data->from_process;
        
        
-       res->peer_port = ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport;
-       sockdata->to_socket = active_socket;
-       /*update the peer to_socket  variable */
-       ((gras_trp_sg_sock_data_t*)active_socket->data)->to_socket = res;
-       sockdata->cond = SIMIX_cond_init();
-       sockdata->mutex = SIMIX_mutex_init();
+  res->peer_port = 
+    ((gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sockdata->to_process))->myport;
+  sockdata->to_socket = active_socket;
+  /*update the peer to_socket  variable */
+  active_socket_data->to_socket = res;
+  sockdata->cond = SIMIX_cond_init();
+  sockdata->mutex = SIMIX_mutex_init();
 
 
-       sockdata->to_host  = SIMIX_process_get_host(((gras_trp_sg_sock_data_t*)(active_socket->data))->from_process);
+  sockdata->to_host = SIMIX_process_get_host(active_socket_data->from_process);
 
 
-       res->data = sockdata;
-       res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host));
+  res->data = sockdata;
+  res->peer_name = strdup(SIMIX_host_get_name(sockdata->to_host));
 
 
-       gras_trp_buf_init_sock(res);
+  gras_trp_buf_init_sock(res);
 
 
-       DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)",SIMIX_process_get_name(sockdata->from_process),res->peer_port, SIMIX_process_get_name(sockdata->to_process), res->port);
+  DEBUG4("Create socket to process:%s(Port %d) from process: %s(Port %d)",
+        SIMIX_process_get_name(sockdata->from_process),
+        res->peer_port,
+        SIMIX_process_get_name(sockdata->to_process), res->port);
 
 
-       SIMIX_mutex_unlock(pd->mutex);
-       return res;
+  SIMIX_mutex_unlock(pd->msg_select_mutex);
+  return res;
 }
 
   
 }
 
   
index 379e5bc..63547d2 100644 (file)
@@ -24,7 +24,8 @@ extern int gras_opt_trp_nomoredata_on_close;
  *** Main user functions
  ***/
 /* stable if we know the storage will keep as is until the next trp_flush */
  *** Main user functions
  ***/
 /* stable if we know the storage will keep as is until the next trp_flush */
-XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size, int stable);
+XBT_PUBLIC(void) gras_trp_send(gras_socket_t sd, char *data, long int size,
+                              int stable);
 XBT_PUBLIC(void) gras_trp_recv(gras_socket_t sd, char *data, long int size);
 XBT_PUBLIC(void) gras_trp_flush(gras_socket_t sd);
 
 XBT_PUBLIC(void) gras_trp_recv(gras_socket_t sd, char *data, long int size);
 XBT_PUBLIC(void) gras_trp_flush(gras_socket_t sd);
 
@@ -81,8 +82,10 @@ struct gras_trp_plugin_ {
 
   void *data; /* plugin-specific data */
  
 
   void *data; /* plugin-specific data */
  
-   /* exit is responsible for freeing data and telling the OS this plugin goes */
-   /* exit=NULL, data gets freed. (ie exit function needed only when data contains pointers) */
+   /* exit is responsible for freeing data and telling to the OS that 
+      this plugin is gone */
+   /* exit=NULL, data gets brutally free()d by the generic interface. 
+      (ie exit function needed only when data contains pointers) */
   void (*exit)(gras_trp_plugin_t);
 };
 
   void (*exit)(gras_trp_plugin_t);
 };
 
@@ -98,17 +101,23 @@ typedef struct {
   char        *name;
   unsigned int name_len;
 
   char        *name;
   unsigned int name_len;
 
-  xbt_dynar_t sockets; /* all sockets known to this process */
   int myport; /* Port on which I listen myself */
   int myport; /* Port on which I listen myself */
-  fd_set *fdset;
+   
+  xbt_dynar_t sockets; /* all sockets known to this process */
+  fd_set *fdset; /* idem, in another formalism */
 
   /* SG only elements. In RL, they are part of the OS ;) */
 
   /* SG only elements. In RL, they are part of the OS ;) */
-       smx_cond_t cond;
-       smx_mutex_t mutex;
-       smx_cond_t cond_meas;
-       smx_mutex_t mutex_meas;
-       xbt_fifo_t active_socket;
-       xbt_fifo_t active_socket_meas;
+   
+  /* List of sockets ready to be select()ed */
+  xbt_fifo_t msg_selectable_sockets; /* regular sockets  */
+  xbt_fifo_t meas_selectable_sockets;/* measurement ones */
+
+  /* Synchronisation on msg_selectable_sockets */
+  smx_cond_t msg_select_cond;
+  smx_mutex_t msg_select_mutex;
+  /* Synchronisation on meas_selectable_sockets */
+  smx_cond_t meas_select_cond;
+  smx_mutex_t meas_select_mutex;
    
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
    
 } s_gras_trp_procdata_t,*gras_trp_procdata_t;
 
index ee6de67..b46e05e 100644 (file)
@@ -102,7 +102,8 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self,
 
   /* make sure this socket will reach someone */
   if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) 
 
   /* make sure this socket will reach someone */
   if (!(peer=SIMIX_host_get_by_name(sock->peer_name))) 
-    THROW1(mismatch_error,0,"Can't connect to %s: no such host.\n",sock->peer_name);
+    THROW1(mismatch_error,0,
+          "Can't connect to %s: no such host.\n",sock->peer_name);
 
   if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) 
     THROW1(mismatch_error,0,
 
   if (!(hd=(gras_hostdata_t *)SIMIX_host_get_data(peer))) 
     THROW1(mismatch_error,0,
@@ -124,30 +125,30 @@ void gras_trp_sg_socket_client(gras_trp_plugin_t self,
   if (pr.meas && !sock->meas) {
     THROW2(mismatch_error,0,
           "can't connect to %s:%d in regular mode, the process listen "
   if (pr.meas && !sock->meas) {
     THROW2(mismatch_error,0,
           "can't connect to %s:%d in regular mode, the process listen "
-          "in meas mode on this port",sock->peer_name,sock->peer_port);
+          "in measurement mode on this port",sock->peer_name,sock->peer_port);
   }
   if (!pr.meas && sock->meas) {
     THROW2(mismatch_error,0,
   }
   if (!pr.meas && sock->meas) {
     THROW2(mismatch_error,0,
-          "can't connect to %s:%d in meas mode, the process listen "
+          "can't connect to %s:%d in measurement mode, the process listen "
           "in regular mode on this port",sock->peer_name,sock->peer_port);
   }
   /* create the socket */
   data = xbt_new(gras_trp_sg_sock_data_t,1);
   data->from_process = SIMIX_process_self();
           "in regular mode on this port",sock->peer_name,sock->peer_port);
   }
   /* create the socket */
   data = xbt_new(gras_trp_sg_sock_data_t,1);
   data->from_process = SIMIX_process_self();
-       data->to_process         = pr.process;
+  data->to_process   = pr.process;
   data->to_host      = peer;
 
   data->to_host      = peer;
 
-       /* initialize mutex and condition of the socket */
-       data->mutex = SIMIX_mutex_init();
-       data->cond = SIMIX_cond_init();
-       data->to_socket = pr.socket; 
+  /* initialize mutex and condition of the socket */
+  data->mutex = SIMIX_mutex_init();
+  data->cond = SIMIX_cond_init();
+  data->to_socket = pr.socket; 
 
   sock->data = data;
   sock->incoming = 1;
 
   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
 
   sock->data = data;
   sock->incoming = 1;
 
   DEBUG5("%s (PID %d) connects in %s mode to %s:%d",
-         SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
-         sock->meas?"meas":"regular",
+        SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
+        sock->meas?"meas":"regular",
         sock->peer_name,sock->peer_port);
 }
 
         sock->peer_name,sock->peer_port);
 }
 
@@ -182,26 +183,26 @@ void gras_trp_sg_socket_server(gras_trp_plugin_t self,
           "can't listen on address %s:%d: port already in use.",
           host,sock->port);
 
           "can't listen on address %s:%d: port already in use.",
           host,sock->port);
 
-  pr.port   = sock->port;
-  pr.meas    = sock->meas;
-       pr.socket = sock;
-       pr.process = SIMIX_process_self();
+  pr.port = sock->port;
+  pr.meas = sock->meas;
+  pr.socket = sock;
+  pr.process = SIMIX_process_self();
   xbt_dynar_push(hd->ports,&pr);
   
   /* Create the socket */
   data = xbt_new(gras_trp_sg_sock_data_t,1);
   data->from_process     = SIMIX_process_self();
   data->to_process       = NULL;
   xbt_dynar_push(hd->ports,&pr);
   
   /* Create the socket */
   data = xbt_new(gras_trp_sg_sock_data_t,1);
   data->from_process     = SIMIX_process_self();
   data->to_process       = NULL;
-       data->to_host      = SIMIX_host_self();
+  data->to_host      = SIMIX_host_self();
   
   
-       data->cond = SIMIX_cond_init();
-       data->mutex = SIMIX_mutex_init();
+  data->cond = SIMIX_cond_init();
+  data->mutex = SIMIX_mutex_init();
 
   sock->data = data;
 
   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
 
   sock->data = data;
 
   VERB6("'%s' (%d) ears on %s:%d%s (%p)",
-    SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
-    host,sock->port,sock->meas? " (mode meas)":"",sock);
+       SIMIX_process_get_name(SIMIX_process_self()), gras_os_getpid(),
+       host,sock->port,sock->meas? " (mode meas)":"",sock);
 
 }
 
 
 }
 
@@ -217,26 +218,25 @@ void gras_trp_sg_socket_close(gras_socket_t sock){
   xbt_assert0(hd,"Please run gras_process_init on each process");
 
   if (sock->data) {
   xbt_assert0(hd,"Please run gras_process_init on each process");
 
   if (sock->data) {
-       SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond);
-               SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex);
-               free(sock->data);
-       }
+    SIMIX_cond_destroy(((gras_trp_sg_sock_data_t*)sock->data)->cond);
+    SIMIX_mutex_destroy(((gras_trp_sg_sock_data_t*)sock->data)->mutex);
+    free(sock->data);
+  }
 
   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
     /* server mode socket. Unregister it from 'OS' tables */
     xbt_dynar_foreach(hd->ports, cpt, pr) {
       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
       if (pr.port == sock->port) {
 
   if (sock->incoming && !sock->outgoing && sock->port >= 0) {
     /* server mode socket. Unregister it from 'OS' tables */
     xbt_dynar_foreach(hd->ports, cpt, pr) {
       DEBUG2("Check pr %d of %lu", cpt, xbt_dynar_length(hd->ports));
       if (pr.port == sock->port) {
-                               xbt_dynar_cursor_rm(hd->ports, &cpt);
-                               XBT_OUT;
-                               return;
+       xbt_dynar_cursor_rm(hd->ports, &cpt);
+       XBT_OUT;
+       return;
       }
     }
     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
          sock,sock->port);
   }
       }
     }
     WARN2("socket_close called on the unknown incoming socket %p (port=%d)",
          sock,sock->port);
   }
-  XBT_OUT;
-
+  XBT_OUT;  
 }
 
 typedef struct {
 }
 
 typedef struct {
@@ -257,22 +257,23 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   char name[256];
   static unsigned int count=0;
 
   char name[256];
   static unsigned int count=0;
 
-       smx_action_t act; /* simix action */
-       gras_trp_sg_sock_data_t *sock_data; 
-       gras_trp_procdata_t trp_remote_proc;
-       gras_msg_procdata_t msg_remote_proc;
-       gras_msg_t msg; /* message to send */
+  smx_action_t act; /* simix action */
+  gras_trp_sg_sock_data_t *sock_data; 
+  gras_trp_procdata_t trp_remote_proc;
+  gras_msg_procdata_t msg_remote_proc;
+  gras_msg_t msg; /* message to send */
 
 
-       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
+  sock_data = (gras_trp_sg_sock_data_t *)sock->data;
 
 
-  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+  xbt_assert0(sock->meas, 
+           "SG chunk exchange shouldn't be used on non-measurement sockets");
 
 
-       SIMIX_mutex_lock(sock_data->mutex);
+  SIMIX_mutex_lock(sock_data->mutex);
   sprintf(name,"Chunk[%d]",count++);
   sprintf(name,"Chunk[%d]",count++);
-       /*initialize gras message */
-       msg = xbt_new(s_gras_msg_t,1);
-       msg->expe = sock;
-       msg->payl_size=size;
+  /*initialize gras message */
+  msg = xbt_new(s_gras_msg_t,1);
+  msg->expe = sock;
+  msg->payl_size=size;
 
   if (data) {
     msg->payl=(void*)xbt_malloc(size);
 
   if (data) {
     msg->payl=(void*)xbt_malloc(size);
@@ -280,72 +281,83 @@ void gras_trp_sg_chunk_send_raw(gras_socket_t sock,
   } else {
     msg->payl = NULL;
   }
   } else {
     msg->payl = NULL;
   }
+  
+  /* put message on msg_queue */
+  msg_remote_proc = (gras_msg_procdata_t)
+    gras_libdata_by_name_from_remote("gras_msg", sock_data->to_process);
 
 
-       /* put message on msg_queue */
-       msg_remote_proc = (gras_msg_procdata_t)gras_libdata_by_name_from_remote("gras_msg",sock_data->to_process);
-       xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
-       /* put his socket on the active_socket list */
-       trp_remote_proc = (gras_trp_procdata_t)gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
-       xbt_fifo_push(trp_remote_proc->active_socket_meas,sock);        
-       /* wake-up the receiver */
+  xbt_fifo_push(msg_remote_proc->msg_to_receive_queue_meas,msg);
 
 
-       SIMIX_cond_signal(trp_remote_proc->cond_meas);
+  /* put his socket on the selectable socket list */
+  trp_remote_proc = (gras_trp_procdata_t)
+    gras_libdata_by_name_from_remote("gras_trp",sock_data->to_process);
 
 
-       /* wait for the receiver */
-       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+  xbt_fifo_push(trp_remote_proc->meas_selectable_sockets,sock);        
 
 
-       /* creates simix action and waits its ends, waits in the sender host condition*/
+  /* wake-up the receiver */
+  SIMIX_cond_signal(trp_remote_proc->meas_select_cond);
+
+  /* wait for the receiver */
+  SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+  
+  /* creates simix action and waits its ends, waits in the sender host
+     condition*/
   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
         name, SIMIX_host_get_name(SIMIX_host_self()),
         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
 
   DEBUG5("send chunk %s from %s to  %s:%d (size=%ld)",
         name, SIMIX_host_get_name(SIMIX_host_self()),
         SIMIX_host_get_name(sock_data->to_host), sock->peer_port,size);
 
-       act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host, name, size, -1);
-       SIMIX_register_action_to_condition(act,sock_data->cond);
-       SIMIX_register_condition_to_action(act,sock_data->cond);
+  act = SIMIX_action_communicate(SIMIX_host_self(), sock_data->to_host,
+                                name, size, -1);
+  SIMIX_register_action_to_condition(act,sock_data->cond);
+  SIMIX_register_condition_to_action(act,sock_data->cond);
 
 
-       SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
-       /* error treatmeant */
+  SIMIX_cond_wait(sock_data->cond,sock_data->mutex);
+  /* error treatmeant (FIXME)*/
 
 
-       /* cleanup structures */
-       SIMIX_action_destroy(act);
+  /* cleanup structures */
+  SIMIX_action_destroy(act);
 
 
-       SIMIX_mutex_unlock(sock_data->mutex);
+  SIMIX_mutex_unlock(sock_data->mutex);
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
 }
 
 int gras_trp_sg_chunk_recv(gras_socket_t sock,
-                           char *data,
-                           unsigned long int size){
-       gras_trp_sg_sock_data_t *sock_data; 
-       gras_trp_sg_sock_data_t *remote_sock_data; 
-       gras_socket_t remote_socket;
+                          char *data,
+                          unsigned long int size){
+  gras_trp_sg_sock_data_t *sock_data; 
+  gras_trp_sg_sock_data_t *remote_sock_data; 
+  gras_socket_t remote_socket;
   gras_msg_t msg_got;
   gras_msg_t msg_got;
-       gras_msg_procdata_t msg_procdata = (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
-  gras_trp_procdata_t trp_proc=(gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
+  gras_msg_procdata_t msg_procdata = 
+    (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+  gras_trp_procdata_t trp_proc =
+    (gras_trp_procdata_t)gras_libdata_by_id(gras_trp_libdata_id);
 
 
-  xbt_assert0(sock->meas, "SG chunk exchange shouldn't be used on non-measurement sockets");
+  xbt_assert0(sock->meas, 
+           "SG chunk exchange shouldn't be used on non-measurement sockets");
        
        
-       SIMIX_mutex_lock(trp_proc->mutex_meas);
-       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
-               SIMIX_cond_wait_timeout(trp_proc->cond_meas,trp_proc->mutex_meas,60);
-       }
-       if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
-               SIMIX_mutex_unlock(trp_proc->mutex_meas);
-               THROW0(timeout_error,0,"Timeout");
-       }
-       SIMIX_mutex_unlock(trp_proc->mutex_meas);
-
-       remote_socket = xbt_fifo_shift(trp_proc->active_socket_meas);
-       remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
+  SIMIX_mutex_lock(trp_proc->meas_select_mutex);
+  if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+    SIMIX_cond_wait_timeout(trp_proc->meas_select_cond,
+                           trp_proc->meas_select_mutex,60);
+  }
+  if (xbt_fifo_size(msg_procdata->msg_to_receive_queue_meas) == 0 ) {
+    SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+    THROW0(timeout_error,0,"Timeout");
+  }
+  SIMIX_mutex_unlock(trp_proc->meas_select_mutex);
+  
+  remote_socket = xbt_fifo_shift(trp_proc->meas_selectable_sockets);
+  remote_sock_data = (gras_trp_sg_sock_data_t *)remote_socket->data;
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
   msg_got = xbt_fifo_shift(msg_procdata->msg_to_receive_queue_meas);
+  
+  sock_data = (gras_trp_sg_sock_data_t *)sock->data;
 
 
-       sock_data = (gras_trp_sg_sock_data_t *)sock->data;
-
-/* ok, I'm here, you can continue the communication */
-       SIMIX_cond_signal(remote_sock_data->cond);
+  /* ok, I'm here, you can continue the communication */
+  SIMIX_cond_signal(remote_sock_data->cond);
 
 
-       SIMIX_mutex_lock(remote_sock_data->mutex);
-/* wait for communication end */
-       SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
+  SIMIX_mutex_lock(remote_sock_data->mutex);
+  /* wait for communication end */
+  SIMIX_cond_wait(remote_sock_data->cond,remote_sock_data->mutex);
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error,0,
 
   if (msg_got->payl_size != size)
     THROW5(mismatch_error,0,
@@ -353,13 +365,15 @@ int gras_trp_sg_chunk_recv(gras_socket_t sock,
           msg_got->payl_size, size,
           SIMIX_host_get_name(sock_data->to_host),
           SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
           msg_got->payl_size, size,
           SIMIX_host_get_name(sock_data->to_host),
           SIMIX_host_get_name(SIMIX_host_self()), sock->peer_port);
-  if (data) {
+
+  if (data) 
     memcpy(data,msg_got->payl,size);
     memcpy(data,msg_got->payl,size);
-       }
-       if (msg_got->payl)
-               xbt_free(msg_got->payl);        
-       xbt_free(msg_got);
-       SIMIX_mutex_unlock(remote_sock_data->mutex);
-       return 0;
+  
+  if (msg_got->payl)
+    xbt_free(msg_got->payl);   
+
+  xbt_free(msg_got);
+  SIMIX_mutex_unlock(remote_sock_data->mutex);
+  return 0;
 }
 
 }
 
index becb262..6528442 100644 (file)
 
 extern int gras_trp_libdata_id; /* our libdata identifier */
 
 
 extern int gras_trp_libdata_id; /* our libdata identifier */
 
-/* The function that select returned the last time we asked. We need this because the TCP read 
-   are greedy and try to get as much data in their buffer as possible (to avoid subsequent syscalls).
+/* The function that select returned the last time we asked. We need this
+   because the TCP read are greedy and try to get as much data in their 
+   buffer as possible (to avoid subsequent syscalls).
    (measurement sockets are not buffered and thus not concerned).
   
    (measurement sockets are not buffered and thus not concerned).
   
-   So, we can get more than one message in one shoot. And when this happens, we have to handle 
-   the same socket again afterward without select()ing at all. 
+   So, we can get more than one message in one shoot. And when this happens,
+   we have to handle the same socket again afterward without select()ing at
+   all. 
  
  
-   Then, this data is not a static of the TCP driver because we want to zero it when
-   it gets closed by the user. If not, we use an already freed pointer, which is bad.
+   Then, this data is not a static of the TCP driver because we want to
+   zero it when it gets closed by the user. If not, we use an already freed 
+   pointer, which is bad.
  
  
-   It gets tricky since gras_socket_close is part of the common API, not only the RL one. */
+   It gets tricky since gras_socket_close is part of the common API, not 
+   only the RL one. */
 extern gras_socket_t _gras_lastly_selected_socket;
 
 /**
 extern gras_socket_t _gras_lastly_selected_socket;
 
 /**
@@ -56,12 +60,17 @@ typedef struct s_gras_socket  {
   int meas :1; /* true if this is an experiment socket instead of messaging */
   int recv_ok :1; /* true if it is valid to recv() on the socket (false if it is a file) */
   int valid :1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */
   int meas :1; /* true if this is an experiment socket instead of messaging */
   int recv_ok :1; /* true if it is valid to recv() on the socket (false if it is a file) */
   int valid :1; /* false if a select returned that the peer quitted, forcing us to "close" the socket */
-  int moredata :1; /* TCP socket use a buffer and read operation get as much data as possible. 
-                     It is possible that several messages are received in one shoot, and select won't catch them afterward again.
-                     This boolean indicates that this is the case, so that we don't call select in that case. 
-                     Note that measurement sockets are not concerned since they use the TCP interface directly, with no buffer. */
-
-  unsigned long int buf_size; /* what to say to the OS. field here to remember it when accepting */
+  int moredata :1; /* TCP socket use a buffer and read operation get as much 
+                     data as possible. It is possible that several messages
+                     are received in one shoot, and select won't catch them 
+                     afterward again. 
+                     This boolean indicates that this is the case, so that we
+                     don't call select in that case.  Note that measurement
+                     sockets are not concerned since they use the TCP
+                     interface directly, with no buffer. */
+
+  unsigned long int buf_size; /* what to say to the OS. 
+                                Field here to remember it when accepting */
    
   int  sd; 
   int  port; /* port on this side */
    
   int  sd; 
   int  port; /* port on this side */
@@ -86,7 +95,7 @@ void gras_trp_iov_setup(gras_trp_plugin_t plug);
 void gras_trp_file_setup(gras_trp_plugin_t plug);
 void gras_trp_sg_setup(gras_trp_plugin_t plug);
 
 void gras_trp_file_setup(gras_trp_plugin_t plug);
 void gras_trp_sg_setup(gras_trp_plugin_t plug);
 
-/*
+/* FIXME: this should be solved by SIMIX
 
   I'm tired of that shit. the select in SG has to create a socket to expeditor
   manually do deal with the weirdness of the hostdata, themselves here to deal
 
   I'm tired of that shit. the select in SG has to create a socket to expeditor
   manually do deal with the weirdness of the hostdata, themselves here to deal
index f985c86..48cd273 100644 (file)
@@ -52,24 +52,25 @@ gras_process_init() {
     hd->refcount++;
   }
 
     hd->refcount++;
   }
 
-       trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
-       pd->pid = PID++;
-
-       if (SIMIX_process_self() != NULL ) {
-               pd->ppid = gras_os_getpid();
-       }
-       else pd->ppid = -1; 
-
-       trp_pd->mutex = SIMIX_mutex_init();
-       trp_pd->cond = SIMIX_cond_init();
-       trp_pd->mutex_meas = SIMIX_mutex_init();
-       trp_pd->cond_meas = SIMIX_cond_init();
-       trp_pd->active_socket = xbt_fifo_new();
-       trp_pd->active_socket_meas = xbt_fifo_new();
-
+  trp_pd = (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
+  pd->pid = PID++;
+  
+  if (SIMIX_process_self() != NULL ) {
+    pd->ppid = gras_os_getpid();
+  }
+  else pd->ppid = -1; 
+  
+  trp_pd->msg_selectable_sockets = xbt_fifo_new();
+  trp_pd->msg_select_mutex = SIMIX_mutex_init();
+  trp_pd->msg_select_cond = SIMIX_cond_init();
+  
+  trp_pd->meas_selectable_sockets = xbt_fifo_new();
+  trp_pd->meas_select_mutex = SIMIX_mutex_init();
+  trp_pd->meas_select_cond = SIMIX_cond_init();
+  
   VERB2("Creating process '%s' (%d)",
   VERB2("Creating process '%s' (%d)",
-          SIMIX_process_get_name(SIMIX_process_self()),
-          gras_os_getpid());
+       SIMIX_process_get_name(SIMIX_process_self()),
+       gras_os_getpid());
 }
 
 void
 }
 
 void
@@ -77,18 +78,23 @@ gras_process_exit() {
        xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets;
   gras_socket_t sock_iter;
   int cursor;
        xbt_dynar_t sockets = ((gras_trp_procdata_t) gras_libdata_by_name("gras_trp"))->sockets;
   gras_socket_t sock_iter;
   int cursor;
-  gras_hostdata_t *hd=(gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
-  gras_procdata_t *pd=(gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self());
+  gras_hostdata_t *hd=
+    (gras_hostdata_t *)SIMIX_host_get_data(SIMIX_host_self());
+  gras_procdata_t *pd=
+    (gras_procdata_t*)SIMIX_process_get_data(SIMIX_process_self());
+
+  gras_msg_procdata_t msg_pd=
+    (gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
+  gras_trp_procdata_t trp_pd=
+    (gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
 
 
-  gras_msg_procdata_t msg_pd=(gras_msg_procdata_t)gras_libdata_by_name("gras_msg");
-  gras_trp_procdata_t trp_pd=(gras_trp_procdata_t)gras_libdata_by_name("gras_trp");
+  SIMIX_mutex_destroy(trp_pd->msg_select_mutex);
+  SIMIX_cond_destroy(trp_pd->msg_select_cond);
+  xbt_fifo_free(trp_pd->msg_selectable_sockets);
 
 
-       SIMIX_mutex_destroy(trp_pd->mutex);
-       SIMIX_cond_destroy(trp_pd->cond);
-       xbt_fifo_free(trp_pd->active_socket);
-       SIMIX_mutex_destroy(trp_pd->mutex_meas);
-       SIMIX_cond_destroy(trp_pd->cond_meas);
-       xbt_fifo_free(trp_pd->active_socket_meas);
+  SIMIX_mutex_destroy(trp_pd->meas_select_mutex);
+  SIMIX_cond_destroy(trp_pd->meas_select_cond);
+  xbt_fifo_free(trp_pd->meas_selectable_sockets);
 
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
 
 
   xbt_assert0(hd,"Run gras_process_init (ie, gras_init)!!");
@@ -99,13 +105,14 @@ gras_process_exit() {
   if (xbt_dynar_length(msg_pd->msg_queue))
     WARN1("process %d terminated, but some messages are still queued",
          gras_os_getpid());
   if (xbt_dynar_length(msg_pd->msg_queue))
     WARN1("process %d terminated, but some messages are still queued",
          gras_os_getpid());
-
-       /* if each process has its sockets list, we need to close them when the process finish */
-       xbt_dynar_foreach(sockets,cursor,sock_iter) {
-               VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
-                               sock_iter);
-               gras_socket_close(sock_iter);
-       }
+  
+  /* if each process has its sockets list, we need to close them when the
+          process finish */
+  xbt_dynar_foreach(sockets,cursor,sock_iter) {
+    VERB1("Closing the socket %p left open on exit. Maybe a socket leak?",
+         sock_iter);
+    gras_socket_close(sock_iter);
+  }
   if ( ! --(hd->refcount)) {
     xbt_dynar_free(&hd->ports);
     free(hd);
   if ( ! --(hd->refcount)) {
     xbt_dynar_free(&hd->ports);
     free(hd);
index 1b72191..9e2f4f5 100644 (file)
 #include "gras/Transport/transport_private.h"
 
 typedef struct {
 #include "gras/Transport/transport_private.h"
 
 typedef struct {
-  int port;  /* list of ports used by a server socket */
-       int meas;   /* (boolean) the channel is for measurements or for messages */
-       smx_process_t process;
-       gras_socket_t socket;
+   int port;  /* list of ports used by a server socket */
+   int meas;   /* (boolean) the channel is for measurements or for messages */
+   smx_process_t process;
+   gras_socket_t socket;
 } gras_sg_portrec_t;
 
 /* Data for each host */
 } gras_sg_portrec_t;
 
 /* Data for each host */
@@ -32,16 +32,14 @@ typedef struct {
 
 /* data for each socket (FIXME: find a better location for that)*/
 typedef struct {
 
 /* data for each socket (FIXME: find a better location for that)*/
 typedef struct {
-  //int from_PID;    /* process which sent this message */
-  //int to_PID;      /* process to which this message is destinated */
-       smx_process_t from_process;
-       smx_process_t to_process;
+   smx_process_t from_process;
+   smx_process_t to_process;
 
 
-  smx_host_t to_host;   /* Who's on other side */
-
-       smx_cond_t cond;
-       smx_mutex_t mutex;
-       gras_socket_t to_socket;
+   smx_host_t to_host;   /* Who's on other side */
+   
+   smx_cond_t cond;
+   smx_mutex_t mutex;
+   gras_socket_t to_socket;
 } gras_trp_sg_sock_data_t;
 
 
 } gras_trp_sg_sock_data_t;