X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/823d34beebb36450fa3074133e498b21b3d53f02..791f832cea6e18633bc614246193551293542bbd:/src/gras/Transport/transport.c diff --git a/src/gras/Transport/transport.c b/src/gras/Transport/transport.c index 6cb3f1e5bd..fda7851e7c 100644 --- a/src/gras/Transport/transport.c +++ b/src/gras/Transport/transport.c @@ -11,7 +11,7 @@ #include "gras/Transport/transport_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(transport,gras,"Conveying bytes over the network"); -XBT_LOG_NEW_SUBCATEGORY(raw_trp,transport,"Conveying bytes over the network without formating"); +XBT_LOG_NEW_SUBCATEGORY(trp_meas,transport,"Conveying bytes over the network without formating for perf measurements"); static short int _gras_trp_started = 0; static xbt_dict_t _gras_trp_plugins; /* All registered plugins */ @@ -94,6 +94,10 @@ void gras_trp_init(void){ void gras_trp_exit(void){ + xbt_dynar_t sockets = gras_socketset_get(); + gras_socket_t sock_iter; + int cursor; + if (_gras_trp_started == 0) { return; } @@ -108,6 +112,14 @@ gras_trp_exit(void){ } #endif + /* Close all the sockets */ + xbt_dynar_foreach(sockets,cursor,sock_iter) { + VERB1("Closing the socket %p left open on exit. Maybe a socket leak?", + sock_iter); + gras_socket_close(sock_iter); + } + + /* Delete the plugins */ xbt_dict_free(&_gras_trp_plugins); } } @@ -153,7 +165,7 @@ void gras_trp_socket_new(int incoming, sock->port = -1; sock->peer_port = -1; sock->peer_name = NULL; - sock->raw = 0; + sock->meas = 0; *dst = sock; @@ -172,7 +184,7 @@ xbt_error_t gras_socket_server_ext(unsigned short port, unsigned long int bufSize, - int raw, + int measurement, /* OUT */ gras_socket_t *dst) { @@ -185,14 +197,16 @@ gras_socket_server_ext(unsigned short port, DEBUG2("Create a server socket from plugin %s on port %d", gras_if_RL() ? "tcp" : "sg", port); - TRY(gras_trp_plugin_get_by_name("buf",&trp)); + TRY(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg") + :"buf"), + &trp)); /* defaults settings */ gras_trp_socket_new(1,&sock); sock->plugin= trp; sock->port=port; sock->bufSize = bufSize; - sock->raw = raw; + sock->meas = measurement; /* Call plugin socket creation function */ DEBUG1("Prepare socket with plugin (fct=%p)",trp->socket_server); @@ -223,7 +237,7 @@ gras_socket_client_ext(const char *host, unsigned short port, unsigned long int bufSize, - int raw, + int measurement, /* OUT */ gras_socket_t *dst) { @@ -233,7 +247,9 @@ gras_socket_client_ext(const char *host, *dst = NULL; - TRY(gras_trp_plugin_get_by_name("buf",&trp)); + TRY(gras_trp_plugin_get_by_name((measurement? (gras_if_RL() ? "tcp" : "sg") + :"buf"), + &trp)); DEBUG1("Create a client socket from plugin %s",gras_if_RL() ? "tcp" : "sg"); /* defaults settings */ @@ -242,7 +258,7 @@ gras_socket_client_ext(const char *host, sock->peer_port = port; sock->peer_name = (char*)strdup(host?host:"localhost"); sock->bufSize = bufSize; - sock->raw = raw; + sock->meas = measurement; /* plugin-specific */ errcode= (*trp->socket_client)(trp, sock); @@ -309,7 +325,7 @@ void gras_socket_close(gras_socket_t sock) { return; } } - WARN0("Ignoring request to free an unknown socket"); + WARN1("Ignoring request to free an unknown socket (%p)",sock); } XBT_OUT; } @@ -375,53 +391,97 @@ char *gras_socket_peer_name(gras_socket_t sock) { return sock->peer_name; } -xbt_error_t gras_socket_raw_send(gras_socket_t peer, +xbt_error_t gras_socket_meas_send(gras_socket_t peer, unsigned int timeout, unsigned long int exp_size, unsigned long int msg_size) { xbt_error_t errcode; - char *chunk = xbt_malloc(msg_size); - int exp_sofar; + char *chunk = xbt_malloc0(msg_size); + unsigned long int exp_sofar; - xbt_assert0(peer->raw,"Asked to send raw data on a regular socket"); + XBT_CIN(trp_meas); + + xbt_assert0(peer->meas,"Asked to send measurement data on a regular socket"); + for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { - CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d", + CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); TRY(gras_trp_chunk_send(peer,chunk,msg_size)); } - CDEBUG5(raw_trp,"Sent %d of %lu (msg_size=%ld) to %s:%d", + CDEBUG5(trp_meas,"Sent %lu of %lu (msg_size=%ld) to %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); free(chunk); - return no_error;/* gras_socket_raw_exchange(peer,1,timeout,expSize,msgSize); */ + + XBT_COUT(trp_meas); + return no_error; } -xbt_error_t gras_socket_raw_recv(gras_socket_t peer, +xbt_error_t gras_socket_meas_recv(gras_socket_t peer, unsigned int timeout, unsigned long int exp_size, unsigned long int msg_size){ xbt_error_t errcode; char *chunk = xbt_malloc(msg_size); - int exp_sofar; + unsigned long int exp_sofar; + + XBT_CIN(trp_meas); + + xbt_assert0(peer->meas,"Asked to receive measurement data on a regular socket\n"); - xbt_assert0(peer->raw,"Asked to recveive raw data on a regular socket\n"); for (exp_sofar=0; exp_sofar < exp_size; exp_sofar += msg_size) { - CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d", + CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); TRY(gras_trp_chunk_recv(peer,chunk,msg_size)); } - CDEBUG5(raw_trp,"Recvd %d of %lu (msg_size=%ld) from %s:%d", + CDEBUG5(trp_meas,"Recvd %ld of %lu (msg_size=%ld) from %s:%d", exp_sofar,exp_size,msg_size, gras_socket_peer_name(peer), gras_socket_peer_port(peer)); free(chunk); - return no_error;/* gras_socket_raw_exchange(peer,0,timeout,expSize,msgSize); */ + XBT_COUT(trp_meas); + + return no_error; } +/** + * \brief Something similar to the good old accept system call. + * + * Make sure that there is someone speaking to the provided server socket. + * In RL, it does an accept(2), close the master socket, and put the accepted + * socket in place of the provided one. In SG, it does not do anything for + * + * You should only call this on measurement sockets. It is automatically + * done for regular sockets, but you usually want more control about + * what's going on with measurement sockets. + * + * + */ +xbt_error_t gras_socket_meas_accept(gras_socket_t peer, gras_socket_t *accepted){ + xbt_error_t errcode; + gras_socket_t res; + + xbt_assert0(peer->meas, + "No need to accept on non-measurement sockets (it's automatic)"); + + if (!peer->accepting) { + /* nothing to accept here */ + *accepted=peer; + return no_error; + } + + TRY((peer->plugin->socket_accept)(peer,accepted)); + (*accepted)->meas = peer->meas; + CDEBUG1(trp_meas,"meas_accepted onto %d",(*accepted)->sd); + + return no_error; +} + + /* * Creating procdata for this module */ @@ -440,6 +500,7 @@ static void gras_trp_procdata_free(void *data) { gras_trp_procdata_t res = (gras_trp_procdata_t)data; xbt_dynar_free(&( res->sockets )); + free(res); } /*