1 /* transport - low level communication */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 int gras_opt_trp_nomoredata_on_close = 0;
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23 "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
26 static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
27 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
33 gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
35 XBT_DEBUG("Create plugin %s", name);
37 plug->name = xbt_strdup(name);
42 if (e.category == mismatch_error) {
43 /* SG plugin raise mismatch when in RL mode (and vice versa) */
54 xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
57 void gras_trp_init(void)
59 if (!_gras_trp_started) {
60 /* make room for all plugins */
61 _gras_trp_plugins = xbt_dict_new();
63 #ifdef HAVE_WINSOCK2_H
64 /* initialize the windows mechanism */
66 WORD wVersionRequested;
69 wVersionRequested = MAKEWORD(2, 0);
71 res = WSAStartup(wVersionRequested, &wsaData);
72 xbt_assert0(res == 0, "Cannot find a usable WinSock DLL");
74 /* Confirm that the WinSock DLL supports 2.0. */
75 /* Note that if the DLL supports versions greater */
76 /* than 2.0 in addition to 2.0, it will still return */
77 /* 2.0 in wVersion since that is the version we */
80 xbt_assert0(LOBYTE(wsaData.wVersion) == 2 &&
81 HIBYTE(wsaData.wVersion) == 0,
82 "Cannot find a usable WinSock DLL");
83 XBT_INFO("Found and initialized winsock2");
84 } /* The WinSock DLL is acceptable. Proceed. */
89 res = WSAStartup(0x0101, &wsaData);
90 xbt_assert0(res == 0, "Cannot find a usable WinSock DLL");
91 XBT_INFO("Found and initialized winsock");
96 gras_trp_plugin_new("file", gras_trp_file_setup);
97 gras_trp_plugin_new("sg", gras_trp_sg_setup);
98 gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
104 void gras_trp_exit(void)
106 XBT_DEBUG("gras_trp value %d", _gras_trp_started);
107 if (_gras_trp_started == 0) {
111 if (--_gras_trp_started == 0) {
112 #ifdef HAVE_WINSOCK_H
113 if (WSACleanup() == SOCKET_ERROR) {
114 if (WSAGetLastError() == WSAEINPROGRESS) {
115 WSACancelBlockingCall();
121 /* Delete the plugins */
122 xbt_dict_free(&_gras_trp_plugins);
127 void gras_trp_plugin_free(void *p)
129 gras_trp_plugin_t plug = p;
134 } else if (plug->data) {
135 XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
146 * gras_trp_socket_new:
148 * Malloc a new socket, and initialize it with defaults
150 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
153 gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
155 XBT_VERB("Create a new socket (%p)", (void *) sock);
159 sock->incoming = incoming ? 1 : 0;
160 sock->outgoing = incoming ? 0 : 1;
161 sock->accepting = incoming ? 1 : 0;
171 sock->bufdata = NULL;
179 * @brief Opens a server socket and makes it ready to be listened to.
180 * @param port: port on which you want to listen
181 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
182 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
184 * In real life, you'll get a TCP socket.
187 gras_socket_server_ext(unsigned short port,
188 unsigned long int buf_size, int measurement)
192 gras_trp_plugin_t trp;
195 XBT_DEBUG("Create a server socket from plugin %s on port %d",
196 gras_if_RL()? "tcp" : "sg", port);
197 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
199 /* defaults settings */
200 gras_trp_socket_new(1, &sock);
202 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
203 sock->meas = measurement;
205 /* Call plugin socket creation function */
206 XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
208 trp->socket_server(trp, port, sock);
209 XBT_DEBUG("in=%c out=%c accept=%c",
210 sock->incoming ? 'y' : 'n',
211 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
219 ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
221 xbt_dynar_push(((gras_trp_procdata_t)
222 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
225 gras_msg_listener_awake();
230 * @brief Opens a server socket on any port in the given range
232 * @param minport: first port we will try
233 * @param maxport: last port we will try
234 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
235 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
237 * If none of the provided ports works, raises the exception got when trying the last possibility
240 gras_socket_server_range(unsigned short minport, unsigned short maxport,
241 unsigned long int buf_size, int measurement)
245 gras_socket_t res = NULL;
248 for (port = minport; port < maxport; port++) {
250 res = gras_socket_server_ext(port, buf_size, measurement);
264 * @brief Opens a client socket to a remote host.
265 * @param host: who you want to connect to
266 * @param port: where you want to connect to on this host
267 * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
268 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
270 * In real life, you'll get a TCP socket.
273 gras_socket_client_ext(const char *host,
275 unsigned long int buf_size, int measurement)
279 gras_trp_plugin_t trp;
282 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
284 XBT_DEBUG("Create a client socket from plugin %s",
285 gras_if_RL()? "tcp" : "sg");
286 /* defaults settings */
287 gras_trp_socket_new(0, &sock);
289 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
290 sock->meas = measurement;
292 /* plugin-specific */
294 (*trp->socket_client) (trp,host,port,sock);
295 XBT_DEBUG("in=%c out=%c accept=%c",
296 sock->incoming ? 'y' : 'n',
297 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
302 xbt_dynar_push(((gras_trp_procdata_t)
303 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
305 gras_msg_listener_awake();
310 * @brief Opens a server socket and make it ready to be listened to.
312 * In real life, you'll get a TCP socket.
314 gras_socket_t gras_socket_server(unsigned short port)
316 return gras_socket_server_ext(port, 32 * 1024, 0);
319 /** @brief Opens a client socket to a remote host */
320 gras_socket_t gras_socket_client(const char *host, unsigned short port)
322 return gras_socket_client_ext(host, port, 0, 0);
325 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
326 gras_socket_t gras_socket_client_from_string(const char *host)
328 xbt_peer_t p = xbt_peer_from_string(host);
329 gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
334 void gras_socket_close_voidp(void *sock)
336 gras_socket_close((gras_socket_t) sock);
339 /** \brief Close socket */
340 void gras_socket_close(gras_socket_t sock)
342 if (--sock->refcount)
345 xbt_dynar_t sockets =
346 ((gras_trp_procdata_t)
347 gras_libdata_by_id(gras_trp_libdata_id))->sockets;
348 gras_socket_t sock_iter = NULL;
352 XBT_VERB("Close %p", sock);
353 if (sock == _gras_lastly_selected_socket) {
354 xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
355 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
359 ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
360 _gras_lastly_selected_socket = NULL;
363 /* FIXME: Issue an event when the socket is closed */
364 XBT_DEBUG("sockets pointer before %p", sockets);
366 /* FIXME: Cannot get the dynar mutex, because it can be already locked */
367 // _xbt_dynar_foreach(sockets,cursor,sock_iter) {
368 for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
369 _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
370 if (sock == sock_iter) {
371 XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
372 xbt_dynar_length(sockets));
373 xbt_dynar_cursor_rm(sockets, &cursor);
374 if (sock->plugin->socket_close)
375 (*sock->plugin->socket_close) (sock);
377 /* free the memory */
384 ("Ignoring request to free an unknown socket (%p). Execution stack:",
386 xbt_backtrace_display_current();
394 * Send a bunch of bytes from on socket
395 * (stable if we know the storage will keep as is until the next trp_flush)
397 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
399 xbt_assert0(sd->outgoing, "Socket not suited for data send");
400 (*sd->plugin->send) (sd, data, size, stable);
404 * gras_trp_chunk_recv:
406 * Receive a bunch of bytes from a socket
408 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
410 xbt_assert0(sd->incoming, "Socket not suited for data receive");
411 (sd->plugin->recv) (sd, data, size);
417 * Make sure all pending communications are done
419 void gras_trp_flush(gras_socket_t sd)
421 if (sd->plugin->flush)
422 (sd->plugin->flush) (sd);
425 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
427 return xbt_dict_get(_gras_trp_plugins, name);
430 int gras_socket_my_port(gras_socket_t sock)
432 if (!sock->plugin->my_port)
433 THROW1(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
434 return (*sock->plugin->my_port)(sock);
438 int gras_socket_peer_port(gras_socket_t sock)
440 if (!sock->plugin->peer_port)
441 THROW1(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
442 return (*sock->plugin->peer_port)(sock);
445 const char *gras_socket_peer_name(gras_socket_t sock)
447 xbt_assert(sock->plugin);
448 return (*sock->plugin->peer_name)(sock);
451 const char *gras_socket_peer_proc(gras_socket_t sock)
453 return (*sock->plugin->peer_proc)(sock);
456 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
458 return (*sock->plugin->peer_proc_set)(sock,peer_proc);
461 /** \brief Check if the provided socket is a measurement one (or a regular one) */
462 int gras_socket_is_meas(gras_socket_t sock)
467 /** \brief Send a chunk of (random) data over a measurement socket
469 * @param peer measurement socket to use for the experiment
470 * @param timeout timeout (in seconds)
471 * @param msg_size size of each chunk sent over the socket (in bytes).
472 * @param msg_amount how many of these packets you want to send.
474 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
475 * each side of the socket should be paired.
477 * The exchanged data is zeroed to make sure it's initialized, but
478 * there is no way to control what is sent (ie, you cannot use these
479 * functions to exchange data out of band).
481 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
482 * were the total amount of data to send and the msg_size. This
483 * was changed for the fool wanting to send more than MAXINT
484 * bytes in a fat pipe.
486 void gras_socket_meas_send(gras_socket_t peer,
487 unsigned int timeout,
488 unsigned long int msg_size,
489 unsigned long int msg_amount)
492 unsigned long int sent_sofar;
495 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
496 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
498 chunk = xbt_malloc0(msg_size);
500 xbt_assert0(peer->meas,
501 "Asked to send measurement data on a regular socket");
502 xbt_assert0(peer->outgoing,
503 "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
505 for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
506 XBT_CDEBUG(gras_trp_meas,
507 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
508 sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
509 gras_socket_peer_port(peer));
510 (*peer->plugin->raw_send) (peer, chunk, msg_size);
512 XBT_CDEBUG(gras_trp_meas,
513 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
514 msg_amount, msg_size, gras_socket_peer_name(peer),
515 gras_socket_peer_port(peer));
523 /** \brief Receive a chunk of data over a measurement socket
525 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
526 * each side of the socket should be paired.
528 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
529 * were the total amount of data to send and the msg_size. This
530 * was changed for the fool wanting to send more than MAXINT
531 * bytes in a fat pipe.
533 void gras_socket_meas_recv(gras_socket_t peer,
534 unsigned int timeout,
535 unsigned long int msg_size,
536 unsigned long int msg_amount)
540 unsigned long int got_sofar;
543 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
544 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
547 chunk = xbt_malloc(msg_size);
549 xbt_assert0(peer->meas,
550 "Asked to receive measurement data on a regular socket");
551 xbt_assert0(peer->incoming, "Socket not suited for data receive");
553 for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
554 XBT_CDEBUG(gras_trp_meas,
555 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
556 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
557 gras_socket_peer_port(peer));
558 (peer->plugin->raw_recv) (peer, chunk, msg_size);
560 XBT_CDEBUG(gras_trp_meas,
561 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
562 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
563 gras_socket_peer_port(peer));
571 * \brief Something similar to the good old accept system call.
573 * Make sure that there is someone speaking to the provided server socket.
574 * In RL, it does an accept(2) and return the result as last argument.
575 * In SG, as accepts are useless, it returns the provided argument as result.
576 * You should thus test whether (peer != accepted) before closing both of them.
578 * You should only call this on measurement sockets. It is automatically
579 * done for regular sockets, but you usually want more control about
580 * what's going on with measurement sockets.
582 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
585 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
586 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
588 xbt_assert0(peer->meas,
589 "No need to accept on non-measurement sockets (it's automatic)");
591 if (!peer->accepting) {
592 /* nothing to accept here (must be in SG) */
593 /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
597 res = (peer->plugin->socket_accept) (peer);
598 res->meas = peer->meas;
599 XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
606 * Creating procdata for this module
608 static void *gras_trp_procdata_new(void)
610 gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
612 res->name = xbt_strdup("gras_trp");
614 res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
621 * Freeing procdata for this module
623 static void gras_trp_procdata_free(void *data)
625 gras_trp_procdata_t res = (gras_trp_procdata_t) data;
627 xbt_dynar_free(&(res->sockets));
632 void gras_trp_socketset_dump(const char *name)
634 gras_trp_procdata_t procdata =
635 (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
640 XBT_INFO("** Dump the socket set %s", name);
641 xbt_dynar_foreach(procdata->sockets, it, s) {
642 XBT_INFO(" %p -> %s:%d %s",
643 s, gras_socket_peer_name(s), gras_socket_peer_port(s),
644 s->valid ? "(valid)" : "(peer dead)");
646 XBT_INFO("** End of socket set %s", name);
650 * Module registration
652 int gras_trp_libdata_id;
653 void gras_trp_register()
655 gras_trp_libdata_id =
656 gras_procdata_add("gras_trp", gras_trp_procdata_new,
657 gras_trp_procdata_free);
660 int gras_os_myport(void)
662 return ((gras_trp_procdata_t)
663 gras_libdata_by_id(gras_trp_libdata_id))->myport;