From: mquinson Date: Sun, 12 Apr 2009 01:04:45 +0000 (+0000) Subject: do not close sockets from main thread, but from listener (queue added for that) becau... X-Git-Tag: v3.3~4 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2940fe68f1f180b7d8cca1e80120f0d86c480878 do not close sockets from main thread, but from listener (queue added for that) because the listener is selecting on that sock, and it makes darwin angry to close selected sockets git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6242 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/gras/Msg/gras_msg_listener.c b/src/gras/Msg/gras_msg_listener.c index 94d7863e4b..c5dcf13116 100644 --- a/src/gras/Msg/gras_msg_listener.c +++ b/src/gras/Msg/gras_msg_listener.c @@ -19,6 +19,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread"); typedef struct s_gras_msg_listener_ { xbt_queue_t incomming_messages; + xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */ gras_socket_t wakeup_sock_listener_side; gras_socket_t wakeup_sock_master_side; xbt_thread_t listener; @@ -27,6 +28,7 @@ typedef struct s_gras_msg_listener_ { static void listener_function(void *p) { gras_msg_listener_t me = (gras_msg_listener_t)p; s_gras_msg_t msg; + xbt_ex_t e; gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener"); DEBUG0("I'm the listener"); while (1) { @@ -39,6 +41,21 @@ static void listener_function(void *p) { else { DEBUG0("Asked to get awake"); } + /* empty the list of sockets to trash */ + TRY { + while (1) { + int sock; + xbt_queue_shift_timed(me->socks_to_close,&sock,0); + if(tcp_close(sock) < 0) { + WARN3("error while closing tcp socket %d: %d (%s)\n", + sock, sock_errno, sock_errstr(sock_errno)); + } + } + } CATCH(e) { + if (e.category != timeout_error) + RETHROW; + xbt_ex_free(e); + } } } @@ -49,6 +66,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){ DEBUG0("Launch listener"); arg->incomming_messages = msg_exchange; + arg->socks_to_close = xbt_queue_new(0,sizeof(int)); /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */ my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport; @@ -70,6 +88,7 @@ void gras_msg_listener_shutdown(gras_msg_listener_t l) { DEBUG0("Listener quit"); xbt_thread_cancel(l->listener); xbt_queue_free(&l->incomming_messages); + xbt_queue_free(&l->socks_to_close); xbt_free(l); } @@ -84,3 +103,13 @@ void gras_msg_listener_awake(){ gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c); } } +void gras_msg_listener_close_socket(int sd) { + gras_procdata_t *pd = gras_procdata_get(); + if (pd->listener) { + xbt_queue_push(pd->listener->socks_to_close,&sd); + gras_msg_listener_awake(); + } else { + /* do it myself */ + tcp_close(sd); + } +} diff --git a/src/gras/Msg/msg_interface.h b/src/gras/Msg/msg_interface.h index ebef2be7ae..ad28838d35 100644 --- a/src/gras/Msg/msg_interface.h +++ b/src/gras/Msg/msg_interface.h @@ -43,7 +43,6 @@ typedef struct { xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */ xbt_queue_t msg_received; - } s_gras_msg_procdata_t,*gras_msg_procdata_t; @@ -51,7 +50,8 @@ void gras_msg_send_namev(gras_socket_t sock, const char *namev, void *payload); void gras_msg_listener_awake(void); - +void gras_msg_listener_close_socket(int sd); + #define GRAS_PROTOCOL_VERSION '\0'; diff --git a/src/gras/Transport/transport_plugin_tcp.c b/src/gras/Transport/transport_plugin_tcp.c index 8b58f553c5..c16da0f830 100644 --- a/src/gras/Transport/transport_plugin_tcp.c +++ b/src/gras/Transport/transport_plugin_tcp.c @@ -15,6 +15,7 @@ #include "xbt/sysdep.h" #include "xbt/ex.h" #include "gras/Transport/transport_private.h" +#include "gras/Msg/msg_interface.h" /* listener_close_socket */ /* FIXME maybe READV is sometime a good thing? */ #undef HAVE_READV @@ -249,28 +250,9 @@ static void gras_trp_sock_socket_close(gras_socket_t sock){ if (!sock) return; /* close only once */ VERB1("close tcp connection %d", sock->sd); - - /* FIXME: no pipe in GRAS so far - if(!FD_ISSET(sd, &connectedPipes)) { - if(shutdown(sd, 2) < 0) { - GetNWSLock(&lock); - tmp_errno = errno; - ReleaseNWSLock(&lock); - - / * The other side may have beaten us to the reset. * / - if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) { - WARN1("CloseSocket: shutdown error %d\n", tmp_errno); - } - } - } */ - - /* close the socket */ - if(tcp_close(sock->sd) < 0) { - WARN3("error while closing tcp socket %d: %d (%s)\n", - sock->sd, sock_errno, sock_errstr(sock_errno)); - } - + /* ask the listener to close the socket */ + gras_msg_listener_close_socket(sock->sd); } /************************************/ /****[ end of SOCKET MANAGEMENT ]****/