1 /* transport - low level communication */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "xbt/socket.h"
13 #include "xbt_modinter.h"
15 #include "xbt_socket_private.h"
16 #include "gras/Msg/msg_interface.h" /* FIXME */
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_trp, xbt,
19 "Conveying bytes over the network");
20 XBT_LOG_NEW_SUBCATEGORY(xbt_trp_meas, xbt_trp,
21 "Conveying bytes over the network without formating for perf measurements");
23 static short int xbt_trp_started = 0;
24 static xbt_dict_t xbt_trp_plugins; /* all registered plugins */
25 static void xbt_trp_plugin_free(void *p); /* free one of the plugins */
27 void xbt_trp_plugin_new(const char *name, xbt_trp_setup_t setup)
29 xbt_trp_plugin_t plug = xbt_new0(s_xbt_trp_plugin_t, 1);
31 XBT_DEBUG("Create plugin %s", name);
33 plug->name = xbt_strdup(name);
35 xbt_dict_set(xbt_trp_plugins, name, plug, NULL);
38 void xbt_trp_preinit(void)
40 if (!xbt_trp_started) {
41 /* make room for all plugins */
42 xbt_trp_plugins = xbt_dict_new_homogeneous(xbt_trp_plugin_free);
44 #ifdef HAVE_WINSOCK2_H
45 /* initialize the windows mechanism */
47 WORD wVersionRequested;
50 wVersionRequested = MAKEWORD(2, 0);
52 res = WSAStartup(wVersionRequested, &wsaData);
53 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
55 /* Confirm that the WinSock DLL supports 2.0. */
56 /* Note that if the DLL supports versions greater */
57 /* than 2.0 in addition to 2.0, it will still return */
58 /* 2.0 in wVersion since that is the version we */
61 xbt_assert(LOBYTE(wsaData.wVersion) == 2 &&
62 HIBYTE(wsaData.wVersion) == 0,
63 "Cannot find a usable WinSock DLL");
64 XBT_INFO("Found and initialized winsock2");
65 } /* The WinSock DLL is acceptable. Proceed. */
70 res = WSAStartup(0x0101, &wsaData);
71 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
72 XBT_INFO("Found and initialized winsock");
76 /* create the TCP transport plugin */
77 xbt_trp_plugin_new("tcp", xbt_trp_tcp_setup);
83 void xbt_trp_postexit(void)
85 XBT_DEBUG("xbt_trp value %d", xbt_trp_started);
86 if (xbt_trp_started == 0) {
90 if (--xbt_trp_started == 0) {
92 if (WSACleanup() == SOCKET_ERROR) {
93 if (WSAGetLastError() == WSAEINPROGRESS) {
94 WSACancelBlockingCall();
100 /* Delete the plugins */
101 xbt_dict_free(&xbt_trp_plugins);
105 void xbt_trp_plugin_free(void *p)
107 xbt_trp_plugin_t plug = p;
112 } else if (plug->data) {
113 XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
126 * Malloc a new socket with the TCP transport plugin and default parameters.
128 void xbt_socket_new(int incoming, xbt_socket_t* dst)
130 xbt_socket_new_ext(incoming, dst, xbt_trp_plugin_get_by_name("tcp"), 0, 0);
134 * xbt_trp_socket_new:
136 * Malloc a new socket.
138 void xbt_socket_new_ext(int incoming,
140 xbt_trp_plugin_t plugin,
141 unsigned long int buf_size,
144 xbt_socket_t sock = xbt_new0(s_xbt_socket_t, 1);
146 XBT_VERB("Create a new socket (%p)", (void *) sock);
148 sock->plugin = plugin;
150 sock->incoming = incoming ? 1 : 0;
151 sock->outgoing = incoming ? 0 : 1;
152 sock->accepting = incoming ? 1 : 0;
153 sock->meas = measurement;
159 sock->buf_size = buf_size;
163 sock->bufdata = NULL;
170 XBT_INLINE void* xbt_socket_get_data(xbt_socket_t sock) {
174 XBT_INLINE void xbt_socket_set_data(xbt_socket_t sock, void* data) {
181 * Send a bunch of bytes from on socket
182 * (stable if we know the storage will keep as is until the next trp_flush)
184 void xbt_trp_send(xbt_socket_t sd, char *data, long int size, int stable)
186 xbt_assert(sd->outgoing, "Socket not suited for data send");
187 sd->plugin->send(sd, data, size, stable);
193 * Receive a bunch of bytes from a socket
195 void xbt_trp_recv(xbt_socket_t sd, char *data, long int size)
197 xbt_assert(sd->incoming, "Socket not suited for data receive");
198 (sd->plugin->recv) (sd, data, size);
204 * Make sure all pending communications are done
206 void xbt_trp_flush(xbt_socket_t sd)
208 if (sd->plugin->flush)
209 (sd->plugin->flush) (sd);
212 xbt_trp_plugin_t xbt_trp_plugin_get_by_name(const char *name)
214 return xbt_dict_get(xbt_trp_plugins, name);
217 int xbt_socket_my_port(xbt_socket_t sock)
219 if (!sock->plugin->my_port)
220 THROWF(unknown_error, 0, "Function my_port unimplemented in plugin %s",sock->plugin->name);
221 return sock->plugin->my_port(sock);
224 int xbt_socket_peer_port(xbt_socket_t sock)
226 if (!sock->plugin->peer_port)
227 THROWF(unknown_error, 0, "Function peer_port unimplemented in plugin %s",sock->plugin->name);
228 return sock->plugin->peer_port(sock);
231 const char *xbt_socket_peer_name(xbt_socket_t sock)
233 xbt_assert(sock->plugin);
234 return sock->plugin->peer_name(sock);
237 const char *xbt_socket_peer_proc(xbt_socket_t sock)
239 return sock->plugin->peer_proc(sock);
242 void xbt_socket_peer_proc_set(xbt_socket_t sock, char *peer_proc)
244 return sock->plugin->peer_proc_set(sock,peer_proc);
247 /** \brief Check if the provided socket is a measurement one (or a regular one) */
248 int xbt_socket_is_meas(xbt_socket_t sock)
253 /** \brief Send a chunk of (random) data over a measurement socket
255 * @param peer measurement socket to use for the experiment
256 * @param timeout timeout (in seconds)
257 * @param msg_size size of each chunk sent over the socket (in bytes).
258 * @param msg_amount how many of these packets you want to send.
260 * Calls to xbt_socket_meas_send() and xbt_socket_meas_recv() on
261 * each side of the socket should be paired.
263 * The exchanged data is zeroed to make sure it's initialized, but
264 * there is no way to control what is sent (ie, you cannot use these
265 * functions to exchange data out of band).
267 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
268 * were the total amount of data to send and the msg_size. This
269 * was changed for the fool wanting to send more than MAXINT
270 * bytes in a fat pipe.
272 void xbt_socket_meas_send(xbt_socket_t peer,
273 unsigned int timeout,
274 unsigned long int msg_size,
275 unsigned long int msg_amount)
278 unsigned long int sent_sofar;
281 THROWF(unknown_error, 0, "measurement sockets were broken in this release of SimGrid and should be ported back in the future."
282 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
284 if (peer->plugin == xbt_trp_plugin_get_by_name("tcp")) {
285 chunk = xbt_malloc0(msg_size);
288 xbt_assert(peer->meas,
289 "Asked to send measurement data on a regular socket");
290 xbt_assert(peer->outgoing,
291 "Socket not suited for data send (this is a server socket)");
293 for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
294 XBT_CDEBUG(xbt_trp_meas,
295 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
296 sent_sofar, msg_amount, msg_size, xbt_socket_peer_name(peer),
297 xbt_socket_peer_port(peer));
298 peer->plugin->raw_send(peer, chunk, msg_size);
300 XBT_CDEBUG(xbt_trp_meas,
301 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
302 msg_amount, msg_size, xbt_socket_peer_name(peer),
303 xbt_socket_peer_port(peer));
305 if (peer->plugin == xbt_trp_plugin_get_by_name("tcp")) {
312 /** \brief Receive a chunk of data over a measurement socket
314 * Calls to xbt_socket_meas_send() and xbt_socket_meas_recv() on
315 * each side of the socket should be paired.
317 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
318 * were the total amount of data to send and the msg_size. This
319 * was changed for the fool wanting to send more than MAXINT
320 * bytes in a fat pipe.
322 void xbt_socket_meas_recv(xbt_socket_t peer,
323 unsigned int timeout,
324 unsigned long int msg_size,
325 unsigned long int msg_amount)
329 unsigned long int got_sofar;
332 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
333 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
335 if (peer->plugin == xbt_trp_plugin_get_by_name("tcp")) {
336 chunk = xbt_malloc(msg_size);
339 xbt_assert(peer->meas,
340 "Asked to receive measurement data on a regular socket");
341 xbt_assert(peer->incoming, "Socket not suited for data receive");
343 for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
344 XBT_CDEBUG(xbt_trp_meas,
345 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
346 got_sofar, msg_amount, msg_size, xbt_socket_peer_name(peer),
347 xbt_socket_peer_port(peer));
348 (peer->plugin->raw_recv) (peer, chunk, msg_size);
350 XBT_CDEBUG(xbt_trp_meas,
351 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
352 got_sofar, msg_amount, msg_size, xbt_socket_peer_name(peer),
353 xbt_socket_peer_port(peer));
355 if (peer->plugin == xbt_trp_plugin_get_by_name("tcp")) {
363 * \brief Something similar to the good old accept system call.
365 * Make sure that there is someone speaking to the provided server socket.
366 * In RL, it does an accept(2) and return the result as last argument.
367 * In SG, as accepts are useless, it returns the provided argument as result.
368 * You should thus test whether (peer != accepted) before closing both of them.
370 * You should only call this on measurement sockets. It is automatically
371 * done for regular sockets, but you usually want more control about
372 * what's going on with measurement sockets.
374 xbt_socket_t xbt_socket_meas_accept(xbt_socket_t peer)
377 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
378 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
380 xbt_assert(peer->meas,
381 "No need to accept on non-measurement sockets (it's automatic)");
383 if (!peer->accepting) {
384 /* nothing to accept here (must be in SG) */
385 /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
389 res = (peer->plugin->socket_accept) (peer);
390 res->meas = peer->meas;
391 XBT_CDEBUG(xbt_trp_meas, "meas_accepted onto %d", res->sd);