/** @brief module initialization; all participating nodes must run this */
void amok_bw_init(void) {
- amok_base_init();
-
if (! _amok_bw_initialized) {
amok_bw_bw_init();
amok_bw_sat_init();
/* Build the Bandwidth datatype descriptions */
bw_request_desc = gras_datadesc_struct("s_bw_request_t");
- gras_datadesc_struct_append(bw_request_desc,"host",
- gras_datadesc_by_name("s_xbt_host_t"));
+ gras_datadesc_struct_append(bw_request_desc,"peer",
+ gras_datadesc_by_name("s_xbt_peer_t"));
gras_datadesc_struct_append(bw_request_desc,"buf_size",
gras_datadesc_by_name("unsigned long int"));
gras_datadesc_struct_append(bw_request_desc,"exp_size",
* Conduct a bandwidth test from the local process to the given peer.
* This call is blocking until the end of the experiment.
*
+ * If the asked experiment lasts less than \a min_duration, another one will be
+ * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
+ * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
+ * reach the \a min_duration). In that case, the reported bandwidth and
+ * duration are the ones of the last run. \a msg_size cannot go over 64Mb
+ * because we need to malloc a block of this size in RL to conduct the
+ * experiment, and we still don't want to visit the swap.
+ *
* Results are reported in last args, and sizes are in byte.
*/
void amok_bw_test(gras_socket_t peer,
int port;
bw_request_t request,request_ack;
xbt_ex_t e;
+ int first_pass;
+ int nb_messages = (exp_size % msg_size == 0) ?
+ (exp_size / msg_size) : (exp_size / msg_size + 1);
for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
TRY {
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
- request->exp_size=exp_size;
+ request->exp_size=msg_size * nb_messages;
request->msg_size=msg_size;
- request->host.name = NULL;
- request->host.port = gras_socket_my_port(measMasterIn);
- DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
- gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
+ request->peer.name = NULL;
+ request->peer.port = gras_socket_my_port(measMasterIn);
+ DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)",
+ gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
buf_size,request->buf_size);
TRY {
TRY {
measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
- request_ack->host.port,
+ request_ack->peer.port,
request->buf_size,1);
} CATCH(e) {
RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
- gras_socket_peer_name(peer),request_ack->host.port);
+ gras_socket_peer_name(peer),request_ack->peer.port);
}
- DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
+ DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
+ request->exp_size, request->msg_size);
*sec = 0;
+ first_pass = 1;
do {
- if (*sec>0) {
+ if (first_pass == 0) {
double meas_duration=*sec;
- request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
- request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
+ if (*sec != 0.0 ) {
+ request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
+ } else {
+ request->msg_size = request->msg_size * 4;
+ }
+
+ if (request->msg_size > 64*1024*1024)
+ request->msg_size = 64*1024*1024;
+ request->exp_size = request->msg_size * nb_messages;
- DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
+ VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
- DEBUG0("Peer is ready for another round of fun");
}
+ first_pass = 0;
*sec=gras_os_time();
TRY {
gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
gras_socket_close(measIn);
RETHROW0("Unable to conduct the experiment: %s");
}
- DEBUG0("Experiment done");
-
*sec = gras_os_time() - *sec;
- *bw = ((double)request->exp_size) / *sec;
+ if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
+ DEBUG1("Experiment done ; it took %f sec", *sec);
+ if (*sec <= 0) {
+ CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
+ }
+
} while (*sec < min_duration);
DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
static xbt_dynar_t msgtwaited=NULL;
DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
- gras_socket_peer_name(expeditor),request->host.port,
+ gras_socket_peer_name(expeditor),request->peer.port,
request->buf_size,request->exp_size,request->msg_size);
/* Build our answer */
answer->buf_size=request->buf_size;
answer->exp_size=request->exp_size;
answer->msg_size=request->msg_size;
- answer->host.port=gras_socket_my_port(measMasterIn);
+ answer->peer.port=gras_socket_my_port(measMasterIn);
TRY {
gras_msg_rpcreturn(60,ctx,&answer);
/* Don't connect asap to leave time to other side to enter the accept() */
TRY {
measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
- request->host.port,
+ request->peer.port,
request->buf_size,1);
} CATCH(e) {
RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
- gras_socket_peer_name(expeditor),request->host.port);
+ gras_socket_peer_name(expeditor),request->peer.port);
/* FIXME: tell error to remote */
}
TRY {
measIn = gras_socket_meas_accept(measMasterIn);
DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
- answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
+ answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
} CATCH(e) {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
void *payload;
int msggot;
TRY {
- DEBUG0("Recv / Send the experiment");
gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
gras_socket_meas_send(measOut,120,1,1);
- DEBUG0("ACK sent");
} CATCH(e) {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
}
/**
- * \brief request a bandwidth measurement between two remote hosts
+ * \brief request a bandwidth measurement between two remote peers
*
- * \arg from_name: Name of the first host
+ * \arg from_name: Name of the first peer
* \arg from_port: port on which the first process is listening for messages
- * \arg to_name: Name of the second host
+ * \arg to_name: Name of the second peer
* \arg to_port: port on which the second process is listening (for messages, do not
* give a measurement socket here. The needed measurement sockets will be created
* automatically and negociated between the peers)
* \arg sec: where the result (in seconds) should be stored.
* \arg bw: observed Bandwidth (in byte/s)
*
- * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
+ * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
* This call is blocking until the end of the experiment.
*
* Results are reported in last args, and sizes are in bytes.
/* The request */
bw_request_t request;
bw_res_t result;
-
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
request->exp_size=exp_size;
request->msg_size=msg_size;
request->min_duration = min_duration;
- request->host.name = (char*)to_name;
- request->host.port = to_port;
+
+ request->peer.name = (char*)to_name;
+ request->peer.port = to_port;
+
sock = gras_socket_client(from_name,from_port);
+
+
+
DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
-
gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
-
+
if (sec)
*sec=result->sec;
if (bw)
asker=gras_msg_cb_ctx_from(ctx);
VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
gras_socket_peer_name(asker),gras_socket_peer_port(asker),
- request->host.name,request->host.port);
- peer = gras_socket_client(request->host.name,request->host.port);
+
+ request->peer.name,request->peer.port);
+ peer = gras_socket_client(request->peer.name,request->peer.port);
amok_bw_test(peer,
request->buf_size,request->exp_size,request->msg_size,
request->min_duration,
&(result->sec),&(result->bw));
-
+
gras_msg_rpcreturn(240,ctx,&result);
gras_os_sleep(1);
gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
- free(request->host.name);
+ free(request->peer.name);
free(request);
free(result);
}
/** \brief builds a matrix of results of bandwidth measurement */
-double * amok_bw_matrix(xbt_dynar_t hosts,
+double * amok_bw_matrix(xbt_dynar_t peers,
int buf_size_bw, int exp_size_bw, int msg_size_bw,
double min_duration) {
double sec;
/* construction of matrices for bandwith and latency */
- int i,j,len=xbt_dynar_length(hosts);
+ int i,j,len=xbt_dynar_length(peers);
double *matrix_res = xbt_new0(double, len*len);
- xbt_host_t h1,h2;
+ xbt_peer_t p1,p2;
- xbt_dynar_foreach (hosts,i,h1) {
- xbt_dynar_foreach (hosts,j,h2) {
+ xbt_dynar_foreach (peers,i,p1) {
+ xbt_dynar_foreach (peers,j,p2) {
if (i!=j) {
/* Mesurements of Bandwidth */
- amok_bw_request(h1->name,h1->port,h2->name,h2->port,
+ amok_bw_request(p1->name,p1->port,p2->name,p2->port,
buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
&sec,&matrix_res[i*len + j]);
}