Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not close sockets from main thread, but from listener (queue added for that) becau...
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sun, 12 Apr 2009 01:04:45 +0000 (01:04 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Sun, 12 Apr 2009 01:04:45 +0000 (01:04 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6242 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/gras/Msg/gras_msg_listener.c
src/gras/Msg/msg_interface.h
src/gras/Transport/transport_plugin_tcp.c

index 94d7863..c5dcf13 100644 (file)
@@ -19,6 +19,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread");
 
 typedef struct s_gras_msg_listener_ {
   xbt_queue_t incomming_messages;
+  xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
   gras_socket_t wakeup_sock_listener_side;
   gras_socket_t wakeup_sock_master_side;
   xbt_thread_t listener;
@@ -27,6 +28,7 @@ typedef struct s_gras_msg_listener_ {
 static void listener_function(void *p) {
   gras_msg_listener_t me = (gras_msg_listener_t)p;
   s_gras_msg_t msg;
+  xbt_ex_t e;
   gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener");
   DEBUG0("I'm the listener");
   while (1) {
@@ -39,6 +41,21 @@ static void listener_function(void *p) {
     else  {
        DEBUG0("Asked to get awake");
     }
+     /* empty the list of sockets to trash */
+     TRY {
+       while (1) {
+          int sock;
+          xbt_queue_shift_timed(me->socks_to_close,&sock,0);
+          if(tcp_close(sock) < 0) {
+             WARN3("error while closing tcp socket %d: %d (%s)\n",
+                   sock, sock_errno, sock_errstr(sock_errno));
+          }
+       }
+     } CATCH(e) {
+       if (e.category != timeout_error)
+         RETHROW;
+       xbt_ex_free(e);
+     }         
   }
 }
 
@@ -49,6 +66,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){
 
   DEBUG0("Launch listener");
   arg->incomming_messages = msg_exchange;
+  arg->socks_to_close = xbt_queue_new(0,sizeof(int));
 
   /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */
   my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
@@ -70,6 +88,7 @@ void gras_msg_listener_shutdown(gras_msg_listener_t l) {
   DEBUG0("Listener quit");
   xbt_thread_cancel(l->listener);
   xbt_queue_free(&l->incomming_messages);
+  xbt_queue_free(&l->socks_to_close);
   xbt_free(l);
 }
 
@@ -84,3 +103,13 @@ void gras_msg_listener_awake(){
                gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c);
        }
 }
+void  gras_msg_listener_close_socket(int sd) {
+   gras_procdata_t *pd = gras_procdata_get();
+   if (pd->listener) {
+      xbt_queue_push(pd->listener->socks_to_close,&sd);
+      gras_msg_listener_awake();
+   } else {
+      /* do it myself */
+      tcp_close(sd);
+   }    
+}
index ebef2be..ad28838 100644 (file)
@@ -43,7 +43,6 @@ typedef struct {
        xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */
        xbt_queue_t msg_received;
 
-
 } s_gras_msg_procdata_t,*gras_msg_procdata_t;
 
 
@@ -51,7 +50,8 @@ void gras_msg_send_namev(gras_socket_t  sock,
                         const char    *namev,
                         void          *payload);
 void gras_msg_listener_awake(void);
-
+void  gras_msg_listener_close_socket(int sd);
+     
 #define GRAS_PROTOCOL_VERSION '\0';
 
 
index 8b58f55..c16da0f 100644 (file)
@@ -15,6 +15,7 @@
 #include "xbt/sysdep.h"
 #include "xbt/ex.h"
 #include "gras/Transport/transport_private.h"
+#include "gras/Msg/msg_interface.h" /* listener_close_socket */
 
 /* FIXME maybe READV is sometime a good thing? */
 #undef HAVE_READV
@@ -249,28 +250,9 @@ static void gras_trp_sock_socket_close(gras_socket_t sock){
   if (!sock) return; /* close only once */
 
   VERB1("close tcp connection %d", sock->sd);
-
-  /* FIXME: no pipe in GRAS so far  
-  if(!FD_ISSET(sd, &connectedPipes)) {
-    if(shutdown(sd, 2) < 0) {
-      GetNWSLock(&lock);
-      tmp_errno = errno;
-      ReleaseNWSLock(&lock);
-      
-      / * The other side may have beaten us to the reset. * /
-      if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
-       WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
-      }
-    }
-  } */
-
    
-  /* close the socket */
-  if(tcp_close(sock->sd) < 0) {
-    WARN3("error while closing tcp socket %d: %d (%s)\n", 
-            sock->sd, sock_errno, sock_errstr(sock_errno));
-  }
-
+  /* ask the listener to close the socket */
+  gras_msg_listener_close_socket(sock->sd);
 }
 /************************************/
 /****[ end of SOCKET MANAGEMENT ]****/