1 /* transport - low level communication */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 int gras_opt_trp_nomoredata_on_close = 0;
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23 "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
26 static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
27 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
33 gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
35 DEBUG1("Create plugin %s", name);
37 plug->name = xbt_strdup(name);
42 if (e.category == mismatch_error) {
43 /* SG plugin raise mismatch when in RL mode (and vice versa) */
54 xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
57 void gras_trp_init(void)
59 if (!_gras_trp_started) {
60 /* make room for all plugins */
61 _gras_trp_plugins = xbt_dict_new();
63 #ifdef HAVE_WINSOCK2_H
64 /* initialize the windows mechanism */
66 WORD wVersionRequested;
69 wVersionRequested = MAKEWORD(2, 0);
70 xbt_assert0(WSAStartup(wVersionRequested, &wsaData) == 0,
71 "Cannot find a usable WinSock DLL");
73 /* Confirm that the WinSock DLL supports 2.0. */
74 /* Note that if the DLL supports versions greater */
75 /* than 2.0 in addition to 2.0, it will still return */
76 /* 2.0 in wVersion since that is the version we */
79 xbt_assert0(LOBYTE(wsaData.wVersion) == 2 &&
80 HIBYTE(wsaData.wVersion) == 0,
81 "Cannot find a usable WinSock DLL");
82 INFO0("Found and initialized winsock2");
83 } /* The WinSock DLL is acceptable. Proceed. */
87 xbt_assert0(WSAStartup(0x0101, &wsaData) == 0,
88 "Cannot find a usable WinSock DLL");
89 INFO0("Found and initialized winsock");
94 gras_trp_plugin_new("file", gras_trp_file_setup);
95 gras_trp_plugin_new("sg", gras_trp_sg_setup);
96 gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
102 void gras_trp_exit(void)
104 DEBUG1("gras_trp value %d", _gras_trp_started);
105 if (_gras_trp_started == 0) {
109 if (--_gras_trp_started == 0) {
110 #ifdef HAVE_WINSOCK_H
111 if (WSACleanup() == SOCKET_ERROR) {
112 if (WSAGetLastError() == WSAEINPROGRESS) {
113 WSACancelBlockingCall();
119 /* Delete the plugins */
120 xbt_dict_free(&_gras_trp_plugins);
125 void gras_trp_plugin_free(void *p)
127 gras_trp_plugin_t plug = p;
132 } else if (plug->data) {
133 DEBUG1("Plugin %s lacks exit(). Free data anyway.", plug->name);
144 * gras_trp_socket_new:
146 * Malloc a new socket, and initialize it with defaults
148 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
151 gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
153 VERB1("Create a new socket (%p)", (void *) sock);
157 sock->incoming = incoming ? 1 : 0;
158 sock->outgoing = incoming ? 0 : 1;
159 sock->accepting = incoming ? 1 : 0;
169 sock->bufdata = NULL;
177 * @brief Opens a server socket and makes it ready to be listened to.
178 * @param port: port on which you want to listen
179 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
180 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
182 * In real life, you'll get a TCP socket.
185 gras_socket_server_ext(unsigned short port,
186 unsigned long int buf_size, int measurement)
190 gras_trp_plugin_t trp;
193 DEBUG2("Create a server socket from plugin %s on port %d",
194 gras_if_RL()? "tcp" : "sg", port);
195 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
197 /* defaults settings */
198 gras_trp_socket_new(1, &sock);
200 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
201 sock->meas = measurement;
203 /* Call plugin socket creation function */
204 DEBUG1("Prepare socket with plugin (fct=%p)", trp->socket_server);
206 trp->socket_server(trp, port, sock);
207 DEBUG3("in=%c out=%c accept=%c",
208 sock->incoming ? 'y' : 'n',
209 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
217 ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
219 xbt_dynar_push(((gras_trp_procdata_t)
220 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
223 gras_msg_listener_awake();
228 * @brief Opens a server socket on any port in the given range
230 * @param minport: first port we will try
231 * @param maxport: last port we will try
232 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
233 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
235 * If none of the provided ports works, raises the exception got when trying the last possibility
238 gras_socket_server_range(unsigned short minport, unsigned short maxport,
239 unsigned long int buf_size, int measurement)
243 gras_socket_t res = NULL;
246 for (port = minport; port < maxport; port++) {
248 res = gras_socket_server_ext(port, buf_size, measurement);
262 * @brief Opens a client socket to a remote host.
263 * @param host: who you want to connect to
264 * @param port: where you want to connect to on this host
265 * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
266 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
268 * In real life, you'll get a TCP socket.
271 gras_socket_client_ext(const char *host,
273 unsigned long int buf_size, int measurement)
277 gras_trp_plugin_t trp;
280 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
282 DEBUG1("Create a client socket from plugin %s",
283 gras_if_RL()? "tcp" : "sg");
284 /* defaults settings */
285 gras_trp_socket_new(0, &sock);
287 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
288 sock->meas = measurement;
290 /* plugin-specific */
292 (*trp->socket_client) (trp,host,port,sock);
293 DEBUG3("in=%c out=%c accept=%c",
294 sock->incoming ? 'y' : 'n',
295 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
300 xbt_dynar_push(((gras_trp_procdata_t)
301 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
303 gras_msg_listener_awake();
308 * @brief Opens a server socket and make it ready to be listened to.
310 * In real life, you'll get a TCP socket.
312 gras_socket_t gras_socket_server(unsigned short port)
314 return gras_socket_server_ext(port, 32 * 1024, 0);
317 /** @brief Opens a client socket to a remote host */
318 gras_socket_t gras_socket_client(const char *host, unsigned short port)
320 return gras_socket_client_ext(host, port, 0, 0);
323 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
324 gras_socket_t gras_socket_client_from_string(const char *host)
326 xbt_peer_t p = xbt_peer_from_string(host);
327 gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
332 void gras_socket_close_voidp(void *sock)
334 gras_socket_close((gras_socket_t) sock);
337 /** \brief Close socket */
338 void gras_socket_close(gras_socket_t sock)
340 if (--sock->refcount)
343 xbt_dynar_t sockets =
344 ((gras_trp_procdata_t)
345 gras_libdata_by_id(gras_trp_libdata_id))->sockets;
346 gras_socket_t sock_iter = NULL;
350 VERB1("Close %p", sock);
351 if (sock == _gras_lastly_selected_socket) {
352 xbt_assert0(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
353 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
357 ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
358 _gras_lastly_selected_socket = NULL;
361 /* FIXME: Issue an event when the socket is closed */
362 DEBUG1("sockets pointer before %p", sockets);
364 /* FIXME: Cannot get the dynar mutex, because it can be already locked */
365 // _xbt_dynar_foreach(sockets,cursor,sock_iter) {
366 for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
367 _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
368 if (sock == sock_iter) {
369 DEBUG2("remove sock cursor %d dize %lu\n", cursor,
370 xbt_dynar_length(sockets));
371 xbt_dynar_cursor_rm(sockets, &cursor);
372 if (sock->plugin->socket_close)
373 (*sock->plugin->socket_close) (sock);
375 /* free the memory */
382 ("Ignoring request to free an unknown socket (%p). Execution stack:",
384 xbt_backtrace_display_current();
392 * Send a bunch of bytes from on socket
393 * (stable if we know the storage will keep as is until the next trp_flush)
395 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
397 xbt_assert0(sd->outgoing, "Socket not suited for data send");
398 (*sd->plugin->send) (sd, data, size, stable);
402 * gras_trp_chunk_recv:
404 * Receive a bunch of bytes from a socket
406 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
408 xbt_assert0(sd->incoming, "Socket not suited for data receive");
409 (sd->plugin->recv) (sd, data, size);
415 * Make sure all pending communications are done
417 void gras_trp_flush(gras_socket_t sd)
419 if (sd->plugin->flush)
420 (sd->plugin->flush) (sd);
423 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
425 return xbt_dict_get(_gras_trp_plugins, name);
428 int gras_socket_my_port(gras_socket_t sock)
430 if (!sock->plugin->my_port)
431 THROW1(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
432 return (*sock->plugin->my_port)(sock);
436 int gras_socket_peer_port(gras_socket_t sock)
438 if (!sock->plugin->peer_port)
439 THROW1(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
440 return (*sock->plugin->peer_port)(sock);
443 const char *gras_socket_peer_name(gras_socket_t sock)
445 xbt_assert(sock->plugin);
446 return (*sock->plugin->peer_name)(sock);
449 const char *gras_socket_peer_proc(gras_socket_t sock)
451 return (*sock->plugin->peer_proc)(sock);
454 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
456 return (*sock->plugin->peer_proc_set)(sock,peer_proc);
459 /** \brief Check if the provided socket is a measurement one (or a regular one) */
460 int gras_socket_is_meas(gras_socket_t sock)
465 /** \brief Send a chunk of (random) data over a measurement socket
467 * @param peer measurement socket to use for the experiment
468 * @param timeout timeout (in seconds)
469 * @param msg_size size of each chunk sent over the socket (in bytes).
470 * @param msg_amount how many of these packets you want to send.
472 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
473 * each side of the socket should be paired.
475 * The exchanged data is zeroed to make sure it's initialized, but
476 * there is no way to control what is sent (ie, you cannot use these
477 * functions to exchange data out of band).
479 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
480 * were the total amount of data to send and the msg_size. This
481 * was changed for the fool wanting to send more than MAXINT
482 * bytes in a fat pipe.
484 void gras_socket_meas_send(gras_socket_t peer,
485 unsigned int timeout,
486 unsigned long int msg_size,
487 unsigned long int msg_amount)
490 unsigned long int sent_sofar;
493 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
494 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
496 chunk = xbt_malloc0(msg_size);
498 xbt_assert0(peer->meas,
499 "Asked to send measurement data on a regular socket");
500 xbt_assert0(peer->outgoing,
501 "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
503 for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
504 CDEBUG5(gras_trp_meas,
505 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
506 sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
507 gras_socket_peer_port(peer));
508 (*peer->plugin->raw_send) (peer, chunk, msg_size);
510 CDEBUG5(gras_trp_meas,
511 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
512 msg_amount, msg_size, gras_socket_peer_name(peer),
513 gras_socket_peer_port(peer));
521 /** \brief Receive a chunk of data over a measurement socket
523 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
524 * each side of the socket should be paired.
526 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
527 * were the total amount of data to send and the msg_size. This
528 * was changed for the fool wanting to send more than MAXINT
529 * bytes in a fat pipe.
531 void gras_socket_meas_recv(gras_socket_t peer,
532 unsigned int timeout,
533 unsigned long int msg_size,
534 unsigned long int msg_amount)
538 unsigned long int got_sofar;
541 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
542 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
545 chunk = xbt_malloc(msg_size);
547 xbt_assert0(peer->meas,
548 "Asked to receive measurement data on a regular socket");
549 xbt_assert0(peer->incoming, "Socket not suited for data receive");
551 for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
552 CDEBUG5(gras_trp_meas,
553 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
554 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
555 gras_socket_peer_port(peer));
556 (peer->plugin->raw_recv) (peer, chunk, msg_size);
558 CDEBUG5(gras_trp_meas,
559 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
560 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
561 gras_socket_peer_port(peer));
569 * \brief Something similar to the good old accept system call.
571 * Make sure that there is someone speaking to the provided server socket.
572 * In RL, it does an accept(2) and return the result as last argument.
573 * In SG, as accepts are useless, it returns the provided argument as result.
574 * You should thus test whether (peer != accepted) before closing both of them.
576 * You should only call this on measurement sockets. It is automatically
577 * done for regular sockets, but you usually want more control about
578 * what's going on with measurement sockets.
580 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
583 THROW0(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
584 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
586 xbt_assert0(peer->meas,
587 "No need to accept on non-measurement sockets (it's automatic)");
589 if (!peer->accepting) {
590 /* nothing to accept here (must be in SG) */
591 /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
595 res = (peer->plugin->socket_accept) (peer);
596 res->meas = peer->meas;
597 CDEBUG1(gras_trp_meas, "meas_accepted onto %d", res->sd);
604 * Creating procdata for this module
606 static void *gras_trp_procdata_new(void)
608 gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
610 res->name = xbt_strdup("gras_trp");
612 res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
619 * Freeing procdata for this module
621 static void gras_trp_procdata_free(void *data)
623 gras_trp_procdata_t res = (gras_trp_procdata_t) data;
625 xbt_dynar_free(&(res->sockets));
630 void gras_trp_socketset_dump(const char *name)
632 gras_trp_procdata_t procdata =
633 (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
638 INFO1("** Dump the socket set %s", name);
639 xbt_dynar_foreach(procdata->sockets, it, s) {
640 INFO4(" %p -> %s:%d %s",
641 s, gras_socket_peer_name(s), gras_socket_peer_port(s),
642 s->valid ? "(valid)" : "(peer dead)");
644 INFO1("** End of socket set %s", name);
648 * Module registration
650 int gras_trp_libdata_id;
651 void gras_trp_register()
653 gras_trp_libdata_id =
654 gras_procdata_add("gras_trp", gras_trp_procdata_new,
655 gras_trp_procdata_free);
658 int gras_os_myport(void)
660 return ((gras_trp_procdata_t)
661 gras_libdata_by_id(gras_trp_libdata_id))->myport;