Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not close sockets from main thread, but from listener (queue added for that) becau...
[simgrid.git] / src / gras / Msg / gras_msg_listener.c
index 94d7863..c5dcf13 100644 (file)
@@ -19,6 +19,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread");
 
 typedef struct s_gras_msg_listener_ {
   xbt_queue_t incomming_messages;
+  xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
   gras_socket_t wakeup_sock_listener_side;
   gras_socket_t wakeup_sock_master_side;
   xbt_thread_t listener;
@@ -27,6 +28,7 @@ typedef struct s_gras_msg_listener_ {
 static void listener_function(void *p) {
   gras_msg_listener_t me = (gras_msg_listener_t)p;
   s_gras_msg_t msg;
+  xbt_ex_t e;
   gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener");
   DEBUG0("I'm the listener");
   while (1) {
@@ -39,6 +41,21 @@ static void listener_function(void *p) {
     else  {
        DEBUG0("Asked to get awake");
     }
+     /* empty the list of sockets to trash */
+     TRY {
+       while (1) {
+          int sock;
+          xbt_queue_shift_timed(me->socks_to_close,&sock,0);
+          if(tcp_close(sock) < 0) {
+             WARN3("error while closing tcp socket %d: %d (%s)\n",
+                   sock, sock_errno, sock_errstr(sock_errno));
+          }
+       }
+     } CATCH(e) {
+       if (e.category != timeout_error)
+         RETHROW;
+       xbt_ex_free(e);
+     }         
   }
 }
 
@@ -49,6 +66,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){
 
   DEBUG0("Launch listener");
   arg->incomming_messages = msg_exchange;
+  arg->socks_to_close = xbt_queue_new(0,sizeof(int));
 
   /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */
   my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
@@ -70,6 +88,7 @@ void gras_msg_listener_shutdown(gras_msg_listener_t l) {
   DEBUG0("Listener quit");
   xbt_thread_cancel(l->listener);
   xbt_queue_free(&l->incomming_messages);
+  xbt_queue_free(&l->socks_to_close);
   xbt_free(l);
 }
 
@@ -84,3 +103,13 @@ void gras_msg_listener_awake(){
                gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c);
        }
 }
+void  gras_msg_listener_close_socket(int sd) {
+   gras_procdata_t *pd = gras_procdata_get();
+   if (pd->listener) {
+      xbt_queue_push(pd->listener->socks_to_close,&sd);
+      gras_msg_listener_awake();
+   } else {
+      /* do it myself */
+      tcp_close(sd);
+   }    
+}