Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
do not brutally kill the listener but ask him politely to die (extending the awaking...
[simgrid.git] / src / gras / Msg / gras_msg_listener.c
index 94d7863..27649e9 100644 (file)
@@ -19,6 +19,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read,gras_msg,"Message reader thread");
 
 typedef struct s_gras_msg_listener_ {
   xbt_queue_t incomming_messages;
+  xbt_queue_t socks_to_close; /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
   gras_socket_t wakeup_sock_listener_side;
   gras_socket_t wakeup_sock_master_side;
   xbt_thread_t listener;
@@ -27,6 +28,7 @@ typedef struct s_gras_msg_listener_ {
 static void listener_function(void *p) {
   gras_msg_listener_t me = (gras_msg_listener_t)p;
   s_gras_msg_t msg;
+  xbt_ex_t e;
   gras_msgtype_t msg_wakeup_listener_t = gras_msgtype_by_name("_wakeup_listener");
   DEBUG0("I'm the listener");
   while (1) {
@@ -37,8 +39,29 @@ static void listener_function(void *p) {
     if (msg.type!=msg_wakeup_listener_t)
        xbt_queue_push(me->incomming_messages, &msg);
     else  {
-       DEBUG0("Asked to get awake");
+       char got = *(char*)msg.payl;
+       if (got == '1') {
+         VERB0("Asked to get awake");
+       } else {
+         VERB0("Asked to die");
+         return ;
+       }               
     }
+     /* empty the list of sockets to trash */
+     TRY {
+       while (1) {
+          int sock;
+          xbt_queue_shift_timed(me->socks_to_close,&sock,0);
+          if(tcp_close(sock) < 0) {
+             WARN3("error while closing tcp socket %d: %d (%s)\n",
+                   sock, sock_errno, sock_errstr(sock_errno));
+          }
+       }
+     } CATCH(e) {
+       if (e.category != timeout_error)
+         RETHROW;
+       xbt_ex_free(e);
+     }         
   }
 }
 
@@ -49,6 +72,7 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){
 
   DEBUG0("Launch listener");
   arg->incomming_messages = msg_exchange;
+  arg->socks_to_close = xbt_queue_new(0,sizeof(int));
 
   /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */
   my_port = ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
@@ -66,14 +90,28 @@ gras_msg_listener_launch(xbt_queue_t msg_exchange){
   return arg;
 }
 
+#include "gras/Virtu/virtu_private.h" /* procdata_t content */
 void gras_msg_listener_shutdown(gras_msg_listener_t l) {
+   gras_procdata_t *pd = gras_procdata_get();
+   char kill = '0';
   DEBUG0("Listener quit");
-  xbt_thread_cancel(l->listener);
+
+   
+   if (pd->listener) 
+     gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&kill);
+   
+   /* FIXME: thread_join is not implemented in SG (remove next conditional when fixed)
+    * But I guess it's not a big deal since we're terminating the thread mainly to 
+    * make it free its OS locks on darwin.
+    * darwin is definitly different from the neat & nice SG world */   
+   if (gras_if_RL()) 
+     xbt_thread_join(pd->listener->listener);
+
   xbt_queue_free(&l->incomming_messages);
+  xbt_queue_free(&l->socks_to_close);
   xbt_free(l);
 }
 
-#include "gras/Virtu/virtu_private.h" /* procdata_t content */
 void gras_msg_listener_awake(){
        gras_procdata_t *pd;
        char c = '1';
@@ -84,3 +122,13 @@ void gras_msg_listener_awake(){
                gras_msg_send(pd->listener->wakeup_sock_master_side,"_wakeup_listener",&c);
        }
 }
+void  gras_msg_listener_close_socket(int sd) {
+   gras_procdata_t *pd = gras_procdata_get();
+   if (pd->listener) {
+      xbt_queue_push(pd->listener->socks_to_close,&sd);
+      gras_msg_listener_awake();
+   } else {
+      /* do it myself */
+      tcp_close(sd);
+   }    
+}