From 49e03b36189c51cf0a19fca59ce81a84bc21b5b4 Mon Sep 17 00:00:00 2001 From: mquinson Date: Thu, 1 Jun 2006 19:58:37 +0000 Subject: [PATCH] In amok_bw_test, it is now possible to ask that the measurement lasts at least N seconds (N being a double). If it is not the case with the provided msg_size, it will get adjusted and a new test is made (using the same measurement sockets so you won't pay the handshaking twice). This mecanism is still to be offered in amok_bw_request(), but I'm on it. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@2334 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/amok/Bandwidth/bandwidth.c | 94 +++++++++++++++++++++----- src/amok/Bandwidth/bandwidth_private.h | 1 + 2 files changed, 78 insertions(+), 17 deletions(-) diff --git a/src/amok/Bandwidth/bandwidth.c b/src/amok/Bandwidth/bandwidth.c index 01d11279c4..519e5363cd 100644 --- a/src/amok/Bandwidth/bandwidth.c +++ b/src/amok/Bandwidth/bandwidth.c @@ -68,6 +68,8 @@ void amok_bw_bw_init() { gras_datadesc_by_name("unsigned long int")); gras_datadesc_struct_append(bw_request_desc,"msg_size", gras_datadesc_by_name("unsigned long int")); + gras_datadesc_struct_append(bw_request_desc,"min_duration", + gras_datadesc_by_name("double")); gras_datadesc_struct_close(bw_request_desc); bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc); @@ -79,6 +81,10 @@ void amok_bw_bw_init() { bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc); gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc); + + gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL); + gras_msgtype_declare("BW stop", NULL); + gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc); } void amok_bw_bw_join() { @@ -101,6 +107,7 @@ void amok_bw_bw_leave() { * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change) * \arg exp_size: Total size of data sent across the network * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent. + * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second. * \arg sec: where the result (in seconds) should be stored. * \arg bw: observed Bandwidth (in byte/s) * @@ -113,6 +120,7 @@ void amok_bw_test(gras_socket_t peer, unsigned long int buf_size, unsigned long int exp_size, unsigned long int msg_size, + double min_duration, /*OUT*/ double *sec, double *bw) { /* Measurement sockets for the experiments */ @@ -162,19 +170,36 @@ void amok_bw_test(gras_socket_t peer, } DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size); - *sec=gras_os_time(); - TRY { - gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size); - gras_socket_meas_recv(measIn,120,1,1); - } CATCH(e) { - gras_socket_close(measOut); - gras_socket_close(measMasterIn); - gras_socket_close(measIn); - RETHROW0("Unable to conduct the experiment: %s"); - } + *sec = 0; + do { + if (*sec>0) { + double meas_duration=*sec; + request->msg_size = request->msg_size * (min_duration / meas_duration); + + DEBUG3("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%ld", + meas_duration,min_duration,request->msg_size); + gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL); + } + + *sec=gras_os_time(); + TRY { + gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size); + DEBUG0("Data sent. Wait ACK"); + gras_socket_meas_recv(measIn,120,1,1); + } CATCH(e) { + gras_socket_close(measOut); + gras_socket_close(measMasterIn); + gras_socket_close(measIn); + RETHROW0("Unable to conduct the experiment: %s"); + } + DEBUG0("Experiment done"); + + *sec = gras_os_time() - *sec; + *bw = ((double)exp_size) / *sec; + } while (*sec < min_duration); - *sec = gras_os_time() - *sec; - *bw = ((double)exp_size) / *sec; + DEBUG0("This measurement was long enough. Stop peer"); + gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL); free(request_ack); free(request); @@ -201,6 +226,9 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, bw_request_t answer; xbt_ex_t e; int port; + int tooshort = 1; + gras_msg_cb_ctx_t ctx_reask; + static xbt_dynar_t msgtwaited=NULL; DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)", gras_socket_peer_name(expeditor),request->host.port, @@ -251,15 +279,46 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, measIn = gras_socket_meas_accept(measMasterIn); DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d", answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port); - - gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size); - gras_socket_meas_send(measOut,120,1,1); } CATCH(e) { gras_socket_close(measMasterIn); gras_socket_close(measIn); gras_socket_close(measOut); /* FIXME: tell error to remote ? */ - RETHROW0("Error encountered while receiving the experiment: %s"); + RETHROW0("Error encountered while opening the meas socket: %s"); + } + + if (!msgtwaited) { + msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL); + xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop")); + xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask")); + } + + while (tooshort) { + void *payload; + int msggot; + TRY { + gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size); + gras_socket_meas_send(measOut,120,1,1); + DEBUG0("ACK sent"); + } CATCH(e) { + gras_socket_close(measMasterIn); + gras_socket_close(measIn); + gras_socket_close(measOut); + /* FIXME: tell error to remote ? */ + RETHROW0("Error encountered while receiving the experiment: %s"); + } + gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload); + switch(msggot) { + case 0: /* BW stop */ + tooshort = 0; + break; + case 1: /* BW reask */ + tooshort = 1; + free(request); + request = (bw_request_t)payload; + gras_msg_rpcreturn(60,ctx_reask,NULL); + } + gras_msg_cb_ctx_free(ctx_reask); } if (measIn != measMasterIn) @@ -268,7 +327,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, gras_socket_close(measOut); free(answer); free(request); - DEBUG0("BW experiment done."); + VERB0("BW experiment done."); return 1; } @@ -344,6 +403,7 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, peer = gras_socket_client(request->host.name,request->host.port); amok_bw_test(peer, request->buf_size,request->exp_size,request->msg_size, + 0, &(result->sec),&(result->bw)); gras_msg_rpcreturn(240,ctx,&result); diff --git a/src/amok/Bandwidth/bandwidth_private.h b/src/amok/Bandwidth/bandwidth_private.h index 0fa86f0ffe..a90431d6c5 100644 --- a/src/amok/Bandwidth/bandwidth_private.h +++ b/src/amok/Bandwidth/bandwidth_private.h @@ -37,6 +37,7 @@ typedef struct { unsigned long int buf_size; unsigned long int exp_size; unsigned long int msg_size; + double min_duration; } s_bw_request_t,*bw_request_t; /* Result of a BW experiment (payload when answering). */ -- 2.20.1