Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
try to port the gras simulation side to the new smx_network infrastructure (not yet...
[simgrid.git] / src / gras / Msg / gras_msg_listener.c
1 /* $Id$ */
2
3 /* Thread in charge of listening the network and queuing incoming messages  */
4
5 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "gras/Msg/msg_private.h"
11 #include "gras/Transport/transport_private.h"
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_msg_read, gras_msg,
13                                 "Message reader thread");
14
15 #include "xbt/ex.h"
16 #include "xbt/queue.h"
17 #include "xbt/synchro.h"
18
19 #include "gras/Transport/transport_interface.h" /* gras_select */
20
21 typedef struct s_gras_msg_listener_ {
22   xbt_queue_t incomming_messages;
23   xbt_queue_t socks_to_close;   /* let the listener close the sockets, since it may be selecting on them. Darwin don't like this trick */
24   gras_socket_t wakeup_sock_listener_side;
25   gras_socket_t wakeup_sock_master_side;
26   xbt_thread_t listener;
27 } s_gras_msg_listener_t;
28
29 static void do_close_socket(gras_socket_t sock) {
30   if (sock->plugin->socket_close)
31      (*sock->plugin->socket_close) (sock);
32    /* free the memory */
33    if (sock->peer_name)
34      free(sock->peer_name);
35    free(sock);
36 }
37 static void listener_function(void *p)
38 {
39   gras_msg_listener_t me = (gras_msg_listener_t) p;
40   s_gras_msg_t msg;
41   xbt_ex_t e;
42   gras_msgtype_t msg_wakeup_listener_t =
43     gras_msgtype_by_name("_wakeup_listener");
44   DEBUG0("I'm the listener");
45   while (1) {
46     DEBUG0("Selecting");
47     msg.expe = gras_trp_select(-1);
48     DEBUG0("Select returned something");
49     gras_msg_recv(msg.expe, &msg);
50     if (msg.type != msg_wakeup_listener_t)
51       xbt_queue_push(me->incomming_messages, &msg);
52     else {
53       char got = *(char *) msg.payl;
54       if (got == '1') {
55         VERB0("Asked to get awake");
56         free(msg.payl);
57       } else {
58         VERB0("Asked to die");
59         //        gras_socket_close(me->wakeup_sock_listener_side);
60         free(msg.payl);
61         return;
62       }
63     }
64     /* empty the list of sockets to trash */
65     TRY {
66       while (1) {
67         gras_socket_t sock;
68         xbt_queue_shift_timed(me->socks_to_close, &sock, 0);
69         do_close_socket(sock);
70       }
71     }
72     CATCH(e) {
73       if (e.category != timeout_error)
74         RETHROW;
75       xbt_ex_free(e);
76     }
77   }
78 }
79
80 gras_msg_listener_t gras_msg_listener_launch(xbt_queue_t msg_exchange)
81 {
82   gras_msg_listener_t arg = xbt_new0(s_gras_msg_listener_t, 1);
83   int my_port;
84
85   DEBUG0("Launch listener");
86   arg->incomming_messages = msg_exchange;
87   arg->socks_to_close = xbt_queue_new(0, sizeof(int));
88
89   /* get a free socket for the receiving part of the listener, taking care that it does not get saved as "myport" number */
90   my_port =
91     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport;
92   arg->wakeup_sock_listener_side =
93     gras_socket_server_range(5000, 6000, -1, 0);
94   ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport =
95     my_port;
96   /* Connect the other part of the socket */
97   arg->wakeup_sock_master_side =
98     gras_socket_client(gras_os_myname(),
99                        gras_socket_my_port(arg->wakeup_sock_listener_side));
100
101   /* declare the message used to awake the listener from the master */
102   gras_msgtype_declare("_wakeup_listener", gras_datadesc_by_name("char"));
103
104   /* actually start the thread */
105   arg->listener = xbt_thread_create("listener", listener_function, arg,0/*not joinable*/);
106   gras_os_sleep(0);             /* TODO: useless? give the listener a chance to initialize even if the main is empty and we cancel it right afterward */
107   return arg;
108 }
109
110 #include "gras/Virtu/virtu_private.h"   /* procdata_t content */
111 void gras_msg_listener_shutdown()
112 {
113   gras_procdata_t *pd = gras_procdata_get();
114   char kill = '0';
115   DEBUG0("Listener quit");
116
117   if (pd->listener)
118     gras_msg_send(pd->listener->wakeup_sock_master_side, "_wakeup_listener",
119           &kill);
120
121   xbt_thread_join(pd->listener->listener);
122
123   //  gras_socket_close(pd->listener->wakeup_sock_master_side); FIXME: uncommenting this leads to deadlock at terminaison
124   xbt_queue_free(&pd->listener->incomming_messages);
125   xbt_queue_free(&pd->listener->socks_to_close);
126   xbt_free(pd->listener);
127 }
128
129 void gras_msg_listener_awake()
130 {
131   gras_procdata_t *pd;
132   char c = '1';
133
134   DEBUG0("Awaking the listener");
135   pd = gras_procdata_get();
136   if (pd->listener) {
137     gras_msg_send(pd->listener->wakeup_sock_master_side, "_wakeup_listener",
138                   &c);
139   }
140 }
141
142 void gras_msg_listener_close_socket(gras_socket_t sock)
143 {
144   gras_procdata_t *pd = gras_procdata_get();
145   if (pd->listener) {
146     xbt_queue_push(pd->listener->socks_to_close, &sock);
147     gras_msg_listener_awake();
148   } else {
149     /* do it myself */
150     do_close_socket(sock);
151   }
152 }