X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a692599504b1ee4d59a1f4079573f8f5b703d64f..a2d9f61a7443c403dc00bcddfd1610bfd0ea1590:/src/amok/Bandwidth/bandwidth.c diff --git a/src/amok/Bandwidth/bandwidth.c b/src/amok/Bandwidth/bandwidth.c index fe7394466c..79ab6443c1 100644 --- a/src/amok/Bandwidth/bandwidth.c +++ b/src/amok/Bandwidth/bandwidth.c @@ -25,8 +25,6 @@ static short _amok_bw_initialized = 0; /** @brief module initialization; all participating nodes must run this */ void amok_bw_init(void) { - amok_base_init(); - if (! _amok_bw_initialized) { amok_bw_bw_init(); amok_bw_sat_init(); @@ -60,8 +58,8 @@ void amok_bw_bw_init() { /* Build the Bandwidth datatype descriptions */ bw_request_desc = gras_datadesc_struct("s_bw_request_t"); - gras_datadesc_struct_append(bw_request_desc,"host", - gras_datadesc_by_name("s_xbt_host_t")); + gras_datadesc_struct_append(bw_request_desc,"peer", + gras_datadesc_by_name("s_xbt_peer_t")); gras_datadesc_struct_append(bw_request_desc,"buf_size", gras_datadesc_by_name("unsigned long int")); gras_datadesc_struct_append(bw_request_desc,"exp_size", @@ -114,6 +112,14 @@ void amok_bw_bw_leave() { * Conduct a bandwidth test from the local process to the given peer. * This call is blocking until the end of the experiment. * + * If the asked experiment lasts less than \a min_duration, another one will be + * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by + * (\a min_duration / measured_duration) (plus 10% to be sure to eventually + * reach the \a min_duration). In that case, the reported bandwidth and + * duration are the ones of the last run. \a msg_size cannot go over 64Mb + * because we need to malloc a block of this size in RL to conduct the + * experiment, and we still don't want to visit the swap. + * * Results are reported in last args, and sizes are in byte. */ void amok_bw_test(gras_socket_t peer, @@ -128,6 +134,7 @@ void amok_bw_test(gras_socket_t peer, int port; bw_request_t request,request_ack; xbt_ex_t e; + int first_pass; for (port = 5000; port < 10000 && measMasterIn == NULL; port++) { TRY { @@ -146,10 +153,10 @@ void amok_bw_test(gras_socket_t peer, request->buf_size=buf_size; request->exp_size=exp_size; request->msg_size=msg_size; - request->host.name = NULL; - request->host.port = gras_socket_my_port(measMasterIn); - DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", - gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port, + request->peer.name = NULL; + request->peer.port = gras_socket_my_port(measMasterIn); + DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", + gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port, buf_size,request->buf_size); TRY { @@ -162,28 +169,37 @@ void amok_bw_test(gras_socket_t peer, TRY { measOut=gras_socket_client_ext(gras_socket_peer_name(peer), - request_ack->host.port, + request_ack->peer.port, request->buf_size,1); } CATCH(e) { RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s", - gras_socket_peer_name(peer),request_ack->host.port); + gras_socket_peer_name(peer),request_ack->peer.port); } - DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size); + DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)", + request->exp_size, request->msg_size); *sec = 0; + first_pass = 1; do { - if (*sec>0) { + if (first_pass == 0) { double meas_duration=*sec; - request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1; - request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1; - + if (*sec != 0.0 ) { + request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1; + request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1; + } else { + request->exp_size = request->exp_size * 4; + request->msg_size = request->msg_size * 4; + } + + if (request->msg_size > 64*1024*1024) + request->msg_size = 64*1024*1024; - DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)", + VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)", meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024); gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL); - DEBUG0("Peer is ready for another round of fun"); } + first_pass = 0; *sec=gras_os_time(); TRY { gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size); @@ -195,10 +211,13 @@ void amok_bw_test(gras_socket_t peer, gras_socket_close(measIn); RETHROW0("Unable to conduct the experiment: %s"); } - DEBUG0("Experiment done"); - *sec = gras_os_time() - *sec; - *bw = ((double)request->exp_size) / *sec; + if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; } + DEBUG1("Experiment done ; it took %f sec", *sec); + if (*sec <= 0) { + CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec); + } + } while (*sec < min_duration); DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer", @@ -235,7 +254,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, static xbt_dynar_t msgtwaited=NULL; DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)", - gras_socket_peer_name(expeditor),request->host.port, + gras_socket_peer_name(expeditor),request->peer.port, request->buf_size,request->exp_size,request->msg_size); /* Build our answer */ @@ -257,7 +276,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, answer->buf_size=request->buf_size; answer->exp_size=request->exp_size; answer->msg_size=request->msg_size; - answer->host.port=gras_socket_my_port(measMasterIn); + answer->peer.port=gras_socket_my_port(measMasterIn); TRY { gras_msg_rpcreturn(60,ctx,&answer); @@ -271,18 +290,18 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, /* Don't connect asap to leave time to other side to enter the accept() */ TRY { measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor), - request->host.port, + request->peer.port, request->buf_size,1); } CATCH(e) { RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", - gras_socket_peer_name(expeditor),request->host.port); + gras_socket_peer_name(expeditor),request->peer.port); /* FIXME: tell error to remote */ } TRY { measIn = gras_socket_meas_accept(measMasterIn); DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d", - answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port); + answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port); } CATCH(e) { gras_socket_close(measMasterIn); gras_socket_close(measIn); @@ -301,10 +320,8 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload; int msggot; TRY { - DEBUG0("Recv / Send the experiment"); gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size); gras_socket_meas_send(measOut,120,1,1); - DEBUG0("ACK sent"); } CATCH(e) { gras_socket_close(measMasterIn); gras_socket_close(measIn); @@ -338,11 +355,11 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, } /** - * \brief request a bandwidth measurement between two remote hosts + * \brief request a bandwidth measurement between two remote peers * - * \arg from_name: Name of the first host + * \arg from_name: Name of the first peer * \arg from_port: port on which the first process is listening for messages - * \arg to_name: Name of the second host + * \arg to_name: Name of the second peer * \arg to_port: port on which the second process is listening (for messages, do not * give a measurement socket here. The needed measurement sockets will be created * automatically and negociated between the peers) @@ -352,7 +369,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, * \arg sec: where the result (in seconds) should be stored. * \arg bw: observed Bandwidth (in byte/s) * - * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port. + * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port. * This call is blocking until the end of the experiment. * * Results are reported in last args, and sizes are in bytes. @@ -369,21 +386,24 @@ void amok_bw_request(const char* from_name,unsigned int from_port, /* The request */ bw_request_t request; bw_res_t result; - request=xbt_new0(s_bw_request_t,1); request->buf_size=buf_size; request->exp_size=exp_size; request->msg_size=msg_size; request->min_duration = min_duration; - request->host.name = (char*)to_name; - request->host.port = to_port; + + request->peer.name = (char*)to_name; + request->peer.port = to_port; + sock = gras_socket_client(from_name,from_port); + + + DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port); - gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result); - + if (sec) *sec=result->sec; if (bw) @@ -409,18 +429,19 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, asker=gras_msg_cb_ctx_from(ctx); VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", gras_socket_peer_name(asker),gras_socket_peer_port(asker), - request->host.name,request->host.port); - peer = gras_socket_client(request->host.name,request->host.port); + + request->peer.name,request->peer.port); + peer = gras_socket_client(request->peer.name,request->peer.port); amok_bw_test(peer, request->buf_size,request->exp_size,request->msg_size, request->min_duration, &(result->sec),&(result->bw)); - + gras_msg_rpcreturn(240,ctx,&result); gras_os_sleep(1); gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */ - free(request->host.name); + free(request->peer.name); free(request); free(result); @@ -428,23 +449,23 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, } /** \brief builds a matrix of results of bandwidth measurement */ -double * amok_bw_matrix(xbt_dynar_t hosts, +double * amok_bw_matrix(xbt_dynar_t peers, int buf_size_bw, int exp_size_bw, int msg_size_bw, double min_duration) { double sec; /* construction of matrices for bandwith and latency */ - int i,j,len=xbt_dynar_length(hosts); + int i,j,len=xbt_dynar_length(peers); double *matrix_res = xbt_new0(double, len*len); - xbt_host_t h1,h2; + xbt_peer_t p1,p2; - xbt_dynar_foreach (hosts,i,h1) { - xbt_dynar_foreach (hosts,j,h2) { + xbt_dynar_foreach (peers,i,p1) { + xbt_dynar_foreach (peers,j,p2) { if (i!=j) { /* Mesurements of Bandwidth */ - amok_bw_request(h1->name,h1->port,h2->name,h2->port, + amok_bw_request(p1->name,p1->port,p2->name,p2->port, buf_size_bw,exp_size_bw,msg_size_bw,min_duration, &sec,&matrix_res[i*len + j]); }