/** @brief module initialization; all participating nodes must run this */
void amok_bw_init(void) {
- amok_base_init();
-
if (! _amok_bw_initialized) {
amok_bw_bw_init();
amok_bw_sat_init();
int port;
bw_request_t request,request_ack;
xbt_ex_t e;
+ int first_pass;
+ int nb_messages = (exp_size % msg_size == 0) ?
+ (exp_size / msg_size) : (exp_size / msg_size + 1);
for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
TRY {
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
- request->exp_size=exp_size;
+ request->exp_size=msg_size * nb_messages;
request->msg_size=msg_size;
request->peer.name = NULL;
request->peer.port = gras_socket_my_port(measMasterIn);
- DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
+ DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)",
gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
buf_size,request->buf_size);
RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
gras_socket_peer_name(peer),request_ack->peer.port);
}
- DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
+ DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
+ request->exp_size, request->msg_size);
*sec = 0;
+ first_pass = 1;
do {
- if (*sec>0) {
+ if (first_pass == 0) {
double meas_duration=*sec;
- request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
- request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
+ double increase;
+ if (*sec != 0.0 ) {
+ increase = (min_duration / meas_duration) * 1.1;
+ } else {
+ increase = 4;
+ }
+ /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
+ if (increase > 20)
+ increase = 20;
+
+ request->msg_size = request->msg_size * increase;
+
+ /* Do not do too large experiments messages or the sensors will start to swap to store one of them */
if (request->msg_size > 64*1024*1024)
request->msg_size = 64*1024*1024;
- VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
+ if (request->exp_size > request->msg_size * nb_messages)
+ CRITICAL0("overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
+ else
+ request->exp_size = request->msg_size * nb_messages;
+
+ VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (got %fkb/s)",
meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
}
+ first_pass = 0;
*sec=gras_os_time();
TRY {
gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
gras_socket_close(measIn);
RETHROW0("Unable to conduct the experiment: %s");
}
- DEBUG0("Experiment done");
-
*sec = gras_os_time() - *sec;
- *bw = ((double)request->exp_size) / *sec;
+ if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
+ DEBUG1("Experiment done ; it took %f sec", *sec);
+ if (*sec <= 0) {
+ CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
+ }
+
} while (*sec < min_duration);
DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
/* The request */
bw_request_t request;
bw_res_t result;
-
request=xbt_new0(s_bw_request_t,1);
request->buf_size=buf_size;
request->exp_size=exp_size;
request->msg_size=msg_size;
request->min_duration = min_duration;
+
request->peer.name = (char*)to_name;
request->peer.port = to_port;
+
sock = gras_socket_client(from_name,from_port);
+
+
+
DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
-
gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
-
+
if (sec)
*sec=result->sec;
if (bw)
asker=gras_msg_cb_ctx_from(ctx);
VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
gras_socket_peer_name(asker),gras_socket_peer_port(asker),
+
request->peer.name,request->peer.port);
peer = gras_socket_client(request->peer.name,request->peer.port);
amok_bw_test(peer,
request->buf_size,request->exp_size,request->msg_size,
request->min_duration,
&(result->sec),&(result->bw));
-
+
gras_msg_rpcreturn(240,ctx,&result);
gras_os_sleep(1);