X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/0174b734fea5f64c8c28a1b489ff1cd6e824ca35..791f832cea6e18633bc614246193551293542bbd:/src/gras/Transport/transport.c diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index 9aec7307e3..fda7851e7c 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -11,7 +11,7 @@ #include "gras/Transport/transport_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network"); -XBT_LOG_NEW_SUBCATEGORY(raw_trp,transport,"Conveying bytes over the network without formating"); +XBT_LOG_NEW_SUBCATEGORY(trp_meas,transport,"Conveying bytes over the network without formating for perf measurements"); static short int _gras_trp_started = 0; static xbt_dict_t _gras_trp_plugins; /* All registered plugins */ @@ -31,8 +31,8 @@ gras_trp_plugin_new(const char *name, gras_trp_setup_t setup) { switch (errcode) { case mismatch_error: /* SG plugin return mismatch when in RL mode (and vice versa) */ - xbt_free(plug->name); - xbt_free(plug); + free(plug->name); + free(plug); break; case no_error: @@ -52,7 +52,7 @@ void gras_trp_init(void){ _gras_trp_plugins=xbt_dict_new(); #ifdef HAVE_WINSOCK2_H - /* initialize the windows mecanism */ + /* initialize the windows mechanism */ { WORD wVersionRequested; WSADATA wsaData; @@ -94,6 +94,10 @@ void gras_trp_init(void){ void gras_trp_exit(void){ + xbt_dynar_t sockets = gras_socketset_get(); + gras_socket_t sock_iter; + int cursor; + if (_gras_trp_started == 0) { return; } @@ -108,6 +112,14 @@ gras_trp_exit(void){ } #endif + /* Close all the sockets */ + xbt_dynar_foreach(sockets,cursor,sock_iter) { + VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", + sock_iter); + gras_socket_close(sock_iter); + } + + /* Delete the plugins */ xbt_dict_free(&_gras_trp_plugins); } } @@ -121,11 +133,11 @@ void gras_trp_plugin_free(void *p) { plug->exit(plug); } else if (plug->data) { DEBUG1("Plugin %s lacks exit(). Free data anyway.",plug->name); - xbt_free(plug->data); + free(plug->data); } - xbt_free(plug->name); - xbt_free(plug); + free(plug->name); + free(plug); } } @@ -153,7 +165,7 @@ void gras_trp_socket_new(int incoming, sock->port = -1; sock->peer_port = -1; sock->peer_name = NULL; - sock->raw = 0; + sock->meas = 0; *dst = sock; @@ -172,7 +184,7 @@ xbt_error_t gras_socket_server_ext(unsigned short port, unsigned long int bufSize, - int raw, + int measurement, /* OUT */ gras_socket_t *dst) { @@ -185,14 +197,16 @@ gras_socket_server_ext(unsigned short port, DEBUG2("Create a server socket from plugin %s on port %d", gras_if_RL() ? "tcp" : "sg", port); - TRY(gras_trp_plugin_get_by_name("buf",&trp)); + TRY(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg") + :"buf"), + &trp)); /* defaults settings */ gras_trp_socket_new(1,&sock); sock->plugin= trp; sock->port=port; sock->bufSize = bufSize; - sock->raw = raw; + sock->meas = measurement; /* Call plugin socket creation function */ DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server); @@ -203,7 +217,7 @@ gras_socket_server_ext(unsigned short port, sock->accepting?'y':'n'); if (errcode != no_error) { - xbt_free(sock); + free(sock); return errcode; } @@ -223,7 +237,7 @@ gras_socket_client_ext(const char *host, unsigned short port, unsigned long int bufSize, - int raw, + int measurement, /* OUT */ gras_socket_t *dst) { @@ -233,7 +247,9 @@ gras_socket_client_ext(const char *host, *dst = NULL; - TRY(gras_trp_plugin_get_by_name("buf",&trp)); + TRY(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg") + :"buf"), + &trp)); DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg"); /* defaults settings */ @@ -242,7 +258,7 @@ gras_socket_client_ext(const char *host, sock->peer_port = port; sock->peer_name = (char*)strdup(host?host:"localhost"); sock->bufSize = bufSize; - sock->raw = raw; + sock->meas = measurement; /* plugin-specific */ errcode= (*trp->socket_client)(trp, sock); @@ -252,7 +268,7 @@ gras_socket_client_ext(const char *host, sock->accepting?'y':'n'); if (errcode != no_error) { - xbt_free(sock); + free(sock); return errcode; } @@ -292,6 +308,7 @@ void gras_socket_close(gras_socket_t sock) { gras_socket_t sock_iter; int cursor; + XBT_IN; /* FIXME: Issue an event when the socket is closed */ if (sock) { xbt_dynar_foreach(sockets,cursor,sock_iter) { @@ -302,13 +319,15 @@ void gras_socket_close(gras_socket_t sock) { /* free the memory */ if (sock->peer_name) - xbt_free(sock->peer_name); - xbt_free(sock); + free(sock->peer_name); + free(sock); + XBT_OUT; return; } } - WARN0("Ignoring request to free an unknown socket"); + WARN1("Ignoring request to free an unknown socket (%p)",sock); } + XBT_OUT; } /** @@ -372,49 +391,128 @@ char *gras_socket_peer_name(gras_socket_t sock) { return sock->peer_name; } -xbt_error_t gras_socket_raw_send(gras_socket_t peer, +xbt_error_t gras_socket_meas_send(gras_socket_t peer, unsigned int timeout, unsigned long int exp_size, unsigned long int msg_size) { xbt_error_t errcode; - char *chunk = xbt_malloc(msg_size); - int exp_sofar; + char *chunk = xbt_malloc0(msg_size); + unsigned long int exp_sofar; - xbt_assert0(peer->raw,"Asked to send raw data on a regular socket"); + XBT_CIN(trp_meas); + + xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket"); + for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { - CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d", + CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); TRY(gras_trp_chunk_send(peer,chunk,msg_size)); } - CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d", + CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); - xbt_free(chunk); - return no_error;/* gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize); */ + free(chunk); + + XBT_COUT(trp_meas); + return no_error; } -xbt_error_t gras_socket_raw_recv(gras_socket_t peer, +xbt_error_t gras_socket_meas_recv(gras_socket_t peer, unsigned int timeout, unsigned long int exp_size, unsigned long int msg_size){ xbt_error_t errcode; char *chunk = xbt_malloc(msg_size); - int exp_sofar; + unsigned long int exp_sofar; + + XBT_CIN(trp_meas); + + xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n"); - xbt_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n"); for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { - CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d", + CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); TRY(gras_trp_chunk_recv(peer,chunk,msg_size)); } - CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d", + CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); - xbt_free(chunk); - return no_error;/* gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize); */ + free(chunk); + XBT_COUT(trp_meas); + + return no_error; +} + +/** + * \brief Something similar to the good old accept system call. + * + * Make sure that there is someone speaking to the provided server socket. + * In RL, it does an accept(2), close the master socket, and put the accepted + * socket in place of the provided one. In SG, it does not do anything for + * + * You should only call this on measurement sockets. It is automatically + * done for regular sockets, but you usually want more control about + * what's going on with measurement sockets. + * + * + */ +xbt_error_t gras_socket_meas_accept(gras_socket_t peer, gras_socket_t *accepted){ + xbt_error_t errcode; + gras_socket_t res; + + xbt_assert0(peer->meas, + "No need to accept on non-measurement sockets (it's automatic)"); + + if (!peer->accepting) { + /* nothing to accept here */ + *accepted=peer; + return no_error; + } + + TRY((peer->plugin->socket_accept)(peer,accepted)); + (*accepted)->meas = peer->meas; + CDEBUG1(trp_meas,"meas_accepted onto %d",(*accepted)->sd); + + return no_error; +} + + +/* + * Creating procdata for this module + */ +static void *gras_trp_procdata_new() { + gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t,1); + + res->sockets = xbt_dynar_new(sizeof(gras_socket_t*), NULL); + + return (void*)res; +} + +/* + * Freeing procdata for this module + */ +static void gras_trp_procdata_free(void *data) { + gras_trp_procdata_t res = (gras_trp_procdata_t)data; + + xbt_dynar_free(&( res->sockets )); + free(res); +} + +/* + * Module registration + */ +void gras_trp_register() { + gras_procdata_add("gras_trp",gras_trp_procdata_new, gras_trp_procdata_free); +} + + +xbt_dynar_t +gras_socketset_get(void) { + /* FIXME: KILLME */ + return ((gras_trp_procdata_t) gras_libdata_get("gras_trp"))->sockets; }