typedef struct s_gras_msg_listener_ {
xbt_queue_t incomming_messages;
+ xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
gras_socket_t wakeup_sock_listener_side;
gras_socket_t wakeup_sock_master_side;
xbt_thread_t listener;
static void listener_function(void *p) {
gras_msg_listener_t me = (gras_msg_listener_t)p;
s_gras_msg_t msg;
+ xbt_ex_t e;
gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener");
DEBUG0("I'm the listener");
while (1) {
else {
DEBUG0("Asked to get awake");
}
+ /* empty the list of sockets to trash */
+ TRY {
+ while (1) {
+ int sock;
+ xbt_queue_shift_timed(me->socks_to_close,&sock,0);
+ if(tcp_close(sock) < 0) {
+ WARN3("error while closing tcp socket %d: %d (%s)\n",
+ sock, sock_errno, sock_errstr(sock_errno));
+ }
+ }
+ } CATCH(e) {
+ if (e.category != timeout_error)
+ RETHROW;
+ xbt_ex_free(e);
+ }
}
}
DEBUG0("Launch listener");
arg->incomming_messages = msg_exchange;
+ arg->socks_to_close = xbt_queue_new(0,sizeof(int));
/* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */
my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
DEBUG0("Listener quit");
xbt_thread_cancel(l->listener);
xbt_queue_free(&l->incomming_messages);
+ xbt_queue_free(&l->socks_to_close);
xbt_free(l);
}
gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c);
}
}
+void gras_msg_listener_close_socket(int sd) {
+ gras_procdata_t *pd = gras_procdata_get();
+ if (pd->listener) {
+ xbt_queue_push(pd->listener->socks_to_close,&sd);
+ gras_msg_listener_awake();
+ } else {
+ /* do it myself */
+ tcp_close(sd);
+ }
+}
xbt_fifo_t msg_to_receive_queue_meas; /* elm type: s_gras_msg_t */
xbt_queue_t msg_received;
-
} s_gras_msg_procdata_t,*gras_msg_procdata_t;
const char *namev,
void *payload);
void gras_msg_listener_awake(void);
-
+void gras_msg_listener_close_socket(int sd);
+
#define GRAS_PROTOCOL_VERSION '\0';
#include "xbt/sysdep.h"
#include "xbt/ex.h"
#include "gras/Transport/transport_private.h"
+#include "gras/Msg/msg_interface.h" /* listener_close_socket */
/* FIXME maybe READV is sometime a good thing? */
#undef HAVE_READV
if (!sock) return; /* close only once */
VERB1("close tcp connection %d", sock->sd);
-
- /* FIXME: no pipe in GRAS so far
- if(!FD_ISSET(sd, &connectedPipes)) {
- if(shutdown(sd, 2) < 0) {
- GetNWSLock(&lock);
- tmp_errno = errno;
- ReleaseNWSLock(&lock);
-
- / * The other side may have beaten us to the reset. * /
- if ((tmp_errno!=ENOTCONN) && (tmp_errno!=ECONNRESET)) {
- WARN1("CloseSocket: shutdown error %d\n", tmp_errno);
- }
- }
- } */
-
- /* close the socket */
- if(tcp_close(sock->sd) < 0) {
- WARN3("error while closing tcp socket %d: %d (%s)\n",
- sock->sd, sock_errno, sock_errstr(sock_errno));
- }
-
+ /* ask the listener to close the socket */
+ gras_msg_listener_close_socket(sock->sd);
}
/************************************/
/****[ end of SOCKET MANAGEMENT ]****/