1 /* transport - low level communication */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 int gras_opt_trp_nomoredata_on_close = 0;
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23 "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
26 static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
27 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
33 gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
35 XBT_DEBUG("Create plugin %s", name);
37 plug->name = xbt_strdup(name);
42 if (e.category == mismatch_error) {
43 /* SG plugin raise mismatch when in RL mode (and vice versa) */
54 xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
57 void gras_trp_init(void)
59 if (!_gras_trp_started) {
60 /* make room for all plugins */
61 _gras_trp_plugins = xbt_dict_new();
63 #ifdef HAVE_WINSOCK2_H
64 /* initialize the windows mechanism */
66 WORD wVersionRequested;
69 wVersionRequested = MAKEWORD(2, 0);
71 res = WSAStartup(wVersionRequested, &wsaData);
72 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
74 /* Confirm that the WinSock DLL supports 2.0. */
75 /* Note that if the DLL supports versions greater */
76 /* than 2.0 in addition to 2.0, it will still return */
77 /* 2.0 in wVersion since that is the version we */
80 xbt_assert(LOBYTE(wsaData.wVersion) == 2 &&
81 HIBYTE(wsaData.wVersion) == 0,
82 "Cannot find a usable WinSock DLL");
83 XBT_INFO("Found and initialized winsock2");
84 } /* The WinSock DLL is acceptable. Proceed. */
89 res = WSAStartup(0x0101, &wsaData);
90 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
91 XBT_INFO("Found and initialized winsock");
96 gras_trp_plugin_new("file", gras_trp_file_setup);
97 gras_trp_plugin_new("sg", gras_trp_sg_setup);
98 gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
104 void gras_trp_exit(void)
106 XBT_DEBUG("gras_trp value %d", _gras_trp_started);
107 if (_gras_trp_started == 0) {
111 if (--_gras_trp_started == 0) {
112 #ifdef HAVE_WINSOCK_H
113 if (WSACleanup() == SOCKET_ERROR) {
114 if (WSAGetLastError() == WSAEINPROGRESS) {
115 WSACancelBlockingCall();
121 /* Delete the plugins */
122 xbt_dict_free(&_gras_trp_plugins);
127 void gras_trp_plugin_free(void *p)
129 gras_trp_plugin_t plug = p;
134 } else if (plug->data) {
135 XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
146 * gras_trp_socket_new:
148 * Malloc a new socket, and initialize it with defaults
150 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
153 gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
155 XBT_VERB("Create a new socket (%p)", (void *) sock);
159 sock->incoming = incoming ? 1 : 0;
160 sock->outgoing = incoming ? 0 : 1;
161 sock->accepting = incoming ? 1 : 0;
171 sock->bufdata = NULL;
179 * @brief Opens a server socket and makes it ready to be listened to.
180 * @param port: port on which you want to listen
181 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
182 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
184 * In real life, you'll get a TCP socket.
187 gras_socket_server_ext(unsigned short port,
188 unsigned long int buf_size, int measurement)
190 gras_trp_plugin_t trp;
193 XBT_DEBUG("Create a server socket from plugin %s on port %d",
194 gras_if_RL()? "tcp" : "sg", port);
195 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
197 /* defaults settings */
198 gras_trp_socket_new(1, &sock);
200 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
201 sock->meas = measurement;
203 /* Call plugin socket creation function */
204 XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
206 trp->socket_server(trp, port, sock);
207 XBT_DEBUG("in=%c out=%c accept=%c",
208 sock->incoming ? 'y' : 'n',
209 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
217 ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
219 xbt_dynar_push(((gras_trp_procdata_t)
220 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
223 gras_msg_listener_awake();
228 * @brief Opens a server socket on any port in the given range
230 * @param minport: first port we will try
231 * @param maxport: last port we will try
232 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
233 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
235 * If none of the provided ports works, raises the exception got when trying the last possibility
238 gras_socket_server_range(unsigned short minport, unsigned short maxport,
239 unsigned long int buf_size, int measurement)
243 gras_socket_t res = NULL;
246 for (port = minport; port < maxport; port++) {
248 res = gras_socket_server_ext(port, buf_size, measurement);
262 * @brief Opens a client socket to a remote host.
263 * @param host: who you want to connect to
264 * @param port: where you want to connect to on this host
265 * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
266 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
268 * In real life, you'll get a TCP socket.
271 gras_socket_client_ext(const char *host,
273 unsigned long int buf_size, int measurement)
275 gras_trp_plugin_t trp;
278 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
280 XBT_DEBUG("Create a client socket from plugin %s",
281 gras_if_RL()? "tcp" : "sg");
282 /* defaults settings */
283 gras_trp_socket_new(0, &sock);
285 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
286 sock->meas = measurement;
288 /* plugin-specific */
290 (*trp->socket_client) (trp,host,port,sock);
291 XBT_DEBUG("in=%c out=%c accept=%c",
292 sock->incoming ? 'y' : 'n',
293 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
299 xbt_dynar_push(((gras_trp_procdata_t)
300 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
302 gras_msg_listener_awake();
307 * @brief Opens a server socket and make it ready to be listened to.
309 * In real life, you'll get a TCP socket.
311 gras_socket_t gras_socket_server(unsigned short port)
313 return gras_socket_server_ext(port, 32 * 1024, 0);
316 /** @brief Opens a client socket to a remote host */
317 gras_socket_t gras_socket_client(const char *host, unsigned short port)
319 return gras_socket_client_ext(host, port, 0, 0);
322 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
323 gras_socket_t gras_socket_client_from_string(const char *host)
325 xbt_peer_t p = xbt_peer_from_string(host);
326 gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
331 void gras_socket_close_voidp(void *sock)
333 gras_socket_close((gras_socket_t) sock);
336 /** \brief Close socket */
337 void gras_socket_close(gras_socket_t sock)
339 if (--sock->refcount)
342 xbt_dynar_t sockets =
343 ((gras_trp_procdata_t)
344 gras_libdata_by_id(gras_trp_libdata_id))->sockets;
345 gras_socket_t sock_iter = NULL;
349 XBT_VERB("Close %p", sock);
350 if (sock == _gras_lastly_selected_socket) {
351 xbt_assert(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
352 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
356 ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
357 _gras_lastly_selected_socket = NULL;
360 /* FIXME: Issue an event when the socket is closed */
361 XBT_DEBUG("sockets pointer before %p", sockets);
363 /* FIXME: Cannot get the dynar mutex, because it can be already locked */
364 // _xbt_dynar_foreach(sockets,cursor,sock_iter) {
365 for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
366 _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
367 if (sock == sock_iter) {
368 XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
369 xbt_dynar_length(sockets));
370 xbt_dynar_cursor_rm(sockets, &cursor);
371 if (sock->plugin->socket_close)
372 (*sock->plugin->socket_close) (sock);
374 /* free the memory */
381 ("Ignoring request to free an unknown socket (%p). Execution stack:",
383 xbt_backtrace_display_current();
391 * Send a bunch of bytes from on socket
392 * (stable if we know the storage will keep as is until the next trp_flush)
394 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
396 xbt_assert(sd->outgoing, "Socket not suited for data send");
397 (*sd->plugin->send) (sd, data, size, stable);
401 * gras_trp_chunk_recv:
403 * Receive a bunch of bytes from a socket
405 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
407 xbt_assert(sd->incoming, "Socket not suited for data receive");
408 (sd->plugin->recv) (sd, data, size);
414 * Make sure all pending communications are done
416 void gras_trp_flush(gras_socket_t sd)
418 if (sd->plugin->flush)
419 (sd->plugin->flush) (sd);
422 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
424 return xbt_dict_get(_gras_trp_plugins, name);
427 int gras_socket_my_port(gras_socket_t sock)
429 if (!sock->plugin->my_port)
430 THROWF(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
431 return (*sock->plugin->my_port)(sock);
435 int gras_socket_peer_port(gras_socket_t sock)
437 if (!sock->plugin->peer_port)
438 THROWF(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
439 return (*sock->plugin->peer_port)(sock);
442 const char *gras_socket_peer_name(gras_socket_t sock)
444 xbt_assert(sock->plugin);
445 return (*sock->plugin->peer_name)(sock);
448 const char *gras_socket_peer_proc(gras_socket_t sock)
450 return (*sock->plugin->peer_proc)(sock);
453 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
455 return (*sock->plugin->peer_proc_set)(sock,peer_proc);
458 /** \brief Check if the provided socket is a measurement one (or a regular one) */
459 int gras_socket_is_meas(gras_socket_t sock)
464 /** \brief Send a chunk of (random) data over a measurement socket
466 * @param peer measurement socket to use for the experiment
467 * @param timeout timeout (in seconds)
468 * @param msg_size size of each chunk sent over the socket (in bytes).
469 * @param msg_amount how many of these packets you want to send.
471 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
472 * each side of the socket should be paired.
474 * The exchanged data is zeroed to make sure it's initialized, but
475 * there is no way to control what is sent (ie, you cannot use these
476 * functions to exchange data out of band).
478 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
479 * were the total amount of data to send and the msg_size. This
480 * was changed for the fool wanting to send more than MAXINT
481 * bytes in a fat pipe.
483 void gras_socket_meas_send(gras_socket_t peer,
484 unsigned int timeout,
485 unsigned long int msg_size,
486 unsigned long int msg_amount)
489 unsigned long int sent_sofar;
492 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
493 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
495 chunk = xbt_malloc0(msg_size);
497 xbt_assert(peer->meas,
498 "Asked to send measurement data on a regular socket");
499 xbt_assert(peer->outgoing,
500 "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
502 for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
503 XBT_CDEBUG(gras_trp_meas,
504 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
505 sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
506 gras_socket_peer_port(peer));
507 (*peer->plugin->raw_send) (peer, chunk, msg_size);
509 XBT_CDEBUG(gras_trp_meas,
510 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
511 msg_amount, msg_size, gras_socket_peer_name(peer),
512 gras_socket_peer_port(peer));
520 /** \brief Receive a chunk of data over a measurement socket
522 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
523 * each side of the socket should be paired.
525 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
526 * were the total amount of data to send and the msg_size. This
527 * was changed for the fool wanting to send more than MAXINT
528 * bytes in a fat pipe.
530 void gras_socket_meas_recv(gras_socket_t peer,
531 unsigned int timeout,
532 unsigned long int msg_size,
533 unsigned long int msg_amount)
537 unsigned long int got_sofar;
540 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
541 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
544 chunk = xbt_malloc(msg_size);
546 xbt_assert(peer->meas,
547 "Asked to receive measurement data on a regular socket");
548 xbt_assert(peer->incoming, "Socket not suited for data receive");
550 for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
551 XBT_CDEBUG(gras_trp_meas,
552 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
553 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
554 gras_socket_peer_port(peer));
555 (peer->plugin->raw_recv) (peer, chunk, msg_size);
557 XBT_CDEBUG(gras_trp_meas,
558 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
559 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
560 gras_socket_peer_port(peer));
568 * \brief Something similar to the good old accept system call.
570 * Make sure that there is someone speaking to the provided server socket.
571 * In RL, it does an accept(2) and return the result as last argument.
572 * In SG, as accepts are useless, it returns the provided argument as result.
573 * You should thus test whether (peer != accepted) before closing both of them.
575 * You should only call this on measurement sockets. It is automatically
576 * done for regular sockets, but you usually want more control about
577 * what's going on with measurement sockets.
579 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
582 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
583 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
585 xbt_assert(peer->meas,
586 "No need to accept on non-measurement sockets (it's automatic)");
588 if (!peer->accepting) {
589 /* nothing to accept here (must be in SG) */
590 /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
594 res = (peer->plugin->socket_accept) (peer);
595 res->meas = peer->meas;
596 XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
603 * Creating procdata for this module
605 static void *gras_trp_procdata_new(void)
607 gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
609 res->name = xbt_strdup("gras_trp");
611 res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
618 * Freeing procdata for this module
620 static void gras_trp_procdata_free(void *data)
622 gras_trp_procdata_t res = (gras_trp_procdata_t) data;
624 xbt_dynar_free(&(res->sockets));
629 void gras_trp_socketset_dump(const char *name)
631 gras_trp_procdata_t procdata =
632 (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
637 XBT_INFO("** Dump the socket set %s", name);
638 xbt_dynar_foreach(procdata->sockets, it, s) {
639 XBT_INFO(" %p -> %s:%d %s",
640 s, gras_socket_peer_name(s), gras_socket_peer_port(s),
641 s->valid ? "(valid)" : "(peer dead)");
643 XBT_INFO("** End of socket set %s", name);
647 * Module registration
649 int gras_trp_libdata_id;
650 void gras_trp_register()
652 gras_trp_libdata_id =
653 gras_procdata_add("gras_trp", gras_trp_procdata_new,
654 gras_trp_procdata_free);
657 int gras_os_myport(void)
659 return ((gras_trp_procdata_t)
660 gras_libdata_by_id(gras_trp_libdata_id))->myport;