Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
[simgrid.git] / src / gras / Transport / transport.c
1 /* transport - low level communication                                      */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 /***
10  *** Options
11  ***/
12 static int gras_opt_trp_nomoredata_on_close = 0;
13
14 #include "xbt/ex.h"
15 #include "xbt/peer.h"
16 #include "xbt/socket.h"
17 #include "xbt/xbt_socket_private.h" /* FIXME */
18 #include "portable.h"
19 #include "gras/Transport/transport_private.h"
20 #include "gras/Msg/msg_interface.h"
21
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
23                                 "Conveying bytes over the network");
24
25 /**
26  * @brief Opens a server socket and makes it ready to be listened to.
27  * @param port: port on which you want to listen
28  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
29  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
30  *
31  * In real life, you'll get a TCP socket.
32  */
33 xbt_socket_t
34 gras_socket_server_ext(unsigned short port,
35                        unsigned long int buf_size, int measurement)
36 {
37   xbt_trp_plugin_t trp;
38   xbt_socket_t sock;
39
40   XBT_DEBUG("Create a server socket from plugin %s on port %d",
41          gras_if_RL() ? "tcp" : "sg", port);
42   trp = xbt_trp_plugin_get_by_name(gras_if_SG() ? "sg" : "tcp");
43
44   /* defaults settings */
45   xbt_socket_new_ext(1,
46                      &sock,
47                      trp,
48                      (buf_size > 0 ? buf_size : 32 * 1024),
49                      measurement);
50
51   /* Call plugin socket creation function */
52   XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
53   TRY {
54     trp->socket_server(trp, port, sock);
55   }
56   CATCH_ANONYMOUS {
57     free(sock);
58     RETHROW;
59   }
60
61   if (!measurement)
62     ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
63         = port;
64   xbt_dynar_push(((gras_trp_procdata_t)
65                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
66                  &sock);
67
68   gras_msg_listener_awake();
69   return sock;
70 }
71
72 /**
73  * @brief Opens a server socket on any port in the given range
74  *
75  * @param minport: first port we will try
76  * @param maxport: last port we will try
77  * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
78  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
79  *
80  * If none of the provided ports works, raises the exception got when trying the last possibility
81  */
82 xbt_socket_t
83 gras_socket_server_range(unsigned short minport, unsigned short maxport,
84                          unsigned long int buf_size, int measurement)
85 {
86
87   int port;
88   xbt_socket_t res = NULL;
89   xbt_ex_t e;
90
91   for (port = minport; port < maxport; port++) {
92     TRY {
93       res = gras_socket_server_ext(port, buf_size, measurement);
94     }
95     CATCH(e) {
96       if (port == maxport)
97         RETHROW;
98       xbt_ex_free(e);
99     }
100     if (res)
101       return res;
102   }
103   THROW_IMPOSSIBLE;
104 }
105
106 /**
107  * @brief Opens a client socket to a remote host.
108  * @param host: who you want to connect to
109  * @param port: where you want to connect to on this host
110  * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
111  * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
112  *
113  * In real life, you'll get a TCP socket.
114  */
115 xbt_socket_t
116 gras_socket_client_ext(const char *host,
117                        unsigned short port,
118                        unsigned long int buf_size, int measurement)
119 {
120   xbt_trp_plugin_t trp;
121   xbt_socket_t sock;
122
123   trp = xbt_trp_plugin_get_by_name(gras_if_SG() ? "sg" : "tcp");
124
125   XBT_DEBUG("Create a client socket from plugin %s",
126          gras_if_RL()? "tcp" : "sg");
127   /* defaults settings */
128   xbt_socket_new_ext(0,
129                      &sock,
130                      trp,
131                      (buf_size > 0 ? buf_size : 32 * 1024),
132                      measurement);
133
134   /* plugin-specific */
135   TRY {
136     (*trp->socket_client) (trp, host, port, sock);
137   }
138   CATCH_ANONYMOUS {
139     free(sock);
140     RETHROW;
141   }
142   xbt_dynar_push(((gras_trp_procdata_t)
143                   gras_libdata_by_id(gras_trp_libdata_id))->sockets,
144                  &sock);
145   gras_msg_listener_awake();
146   return sock;
147 }
148
149 /**
150  * @brief Opens a server socket and make it ready to be listened to.
151  *
152  * In real life, you'll get a TCP socket.
153  */
154 xbt_socket_t gras_socket_server(unsigned short port)
155 {
156   return gras_socket_server_ext(port, 32 * 1024, 0);
157 }
158
159 /** @brief Opens a client socket to a remote host */
160 xbt_socket_t gras_socket_client(const char *host, unsigned short port)
161 {
162   return gras_socket_client_ext(host, port, 0, 0);
163 }
164
165 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
166 xbt_socket_t gras_socket_client_from_string(const char *host)
167 {
168   xbt_peer_t p = xbt_peer_from_string(host);
169   xbt_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
170   xbt_peer_free(p);
171   return res;
172 }
173
174 void gras_socket_close_voidp(void *sock)
175 {
176   gras_socket_close((xbt_socket_t) sock);
177 }
178
179 /** \brief Close socket */
180 void gras_socket_close(xbt_socket_t sock)
181 {
182   if (--sock->refcount)
183     return;
184
185   xbt_dynar_t sockets =
186       ((gras_trp_procdata_t)
187        gras_libdata_by_id(gras_trp_libdata_id))->sockets;
188   xbt_socket_t sock_iter = NULL;
189   unsigned int cursor;
190
191   XBT_IN("");
192   XBT_VERB("Close %p", sock);
193   if (sock == _gras_lastly_selected_socket) {
194     xbt_assert(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
195                 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
196
197     if (sock->moredata)
198       XBT_CRITICAL
199           ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
200     _gras_lastly_selected_socket = NULL;
201   }
202
203   /* FIXME: Issue an event when the socket is closed */
204   XBT_DEBUG("sockets pointer before %p", sockets);
205   if (sock) {
206     /* FIXME: Cannot get the dynar mutex, because it can be already locked */
207 //              _xbt_dynar_foreach(sockets,cursor,sock_iter) {
208     for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
209       _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
210       if (sock == sock_iter) {
211         XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
212                xbt_dynar_length(sockets));
213         xbt_dynar_cursor_rm(sockets, &cursor);
214         if (sock->plugin->socket_close)
215           (*sock->plugin->socket_close) (sock);
216
217         /* free the memory */
218         free(sock);
219         XBT_OUT();
220         return;
221       }
222     }
223     XBT_WARN
224         ("Ignoring request to free an unknown socket (%p). Execution stack:",
225          sock);
226     xbt_backtrace_display_current();
227   }
228   XBT_OUT();
229 }
230
231 /*
232  * Creating procdata for this module
233  */
234 static void *gras_trp_procdata_new(void)
235 {
236   gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
237
238   res->name = xbt_strdup("gras_trp");
239   res->name_len = 0;
240   res->sockets = xbt_dynar_new_sync(sizeof(xbt_socket_t *), NULL);
241   res->myport = 0;
242
243   return (void *) res;
244 }
245
246 /*
247  * Freeing procdata for this module
248  */
249 static void gras_trp_procdata_free(void *data)
250 {
251   gras_trp_procdata_t res = (gras_trp_procdata_t) data;
252
253   xbt_dynar_free(&(res->sockets));
254   free(res->name);
255   free(res);
256 }
257
258 void gras_trp_socketset_dump(const char *name)
259 {
260   gras_trp_procdata_t procdata =
261       (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
262
263   unsigned int it;
264   xbt_socket_t s;
265
266   XBT_INFO("** Dump the socket set %s", name);
267   xbt_dynar_foreach(procdata->sockets, it, s) {
268     XBT_INFO("  %p -> %s:%d %s",
269           s, xbt_socket_peer_name(s), xbt_socket_peer_port(s),
270           s->valid ? "(valid)" : "(peer dead)");
271   }
272   XBT_INFO("** End of socket set %s", name);
273 }
274
275 /*
276  * Module registration
277  */
278 int gras_trp_libdata_id;
279 void gras_trp_register()
280 {
281   gras_trp_libdata_id =
282       gras_procdata_add("gras_trp", gras_trp_procdata_new,
283                         gras_trp_procdata_free);
284 }
285
286 int gras_os_myport(void)
287 {
288   return ((gras_trp_procdata_t)
289           gras_libdata_by_id(gras_trp_libdata_id))->myport;
290 }