void amok_bw_bw_init()
{
- gras_datadesc_type_t bw_request_desc, bw_res_desc;
+ xbt_datadesc_type_t bw_request_desc, bw_res_desc;
/* Build the Bandwidth datatype descriptions */
- bw_request_desc = gras_datadesc_struct("s_bw_request_t");
- gras_datadesc_struct_append(bw_request_desc, "peer",
- gras_datadesc_by_name("s_xbt_peer_t"));
- gras_datadesc_struct_append(bw_request_desc, "buf_size",
- gras_datadesc_by_name("unsigned long int"));
- gras_datadesc_struct_append(bw_request_desc, "msg_size",
- gras_datadesc_by_name("unsigned long int"));
- gras_datadesc_struct_append(bw_request_desc, "msg_amount",
- gras_datadesc_by_name("unsigned long int"));
- gras_datadesc_struct_append(bw_request_desc, "min_duration",
- gras_datadesc_by_name("double"));
- gras_datadesc_struct_close(bw_request_desc);
- bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
-
- bw_res_desc = gras_datadesc_struct("s_bw_res_t");
- gras_datadesc_struct_append(bw_res_desc, "timestamp",
- gras_datadesc_by_name("unsigned int"));
- gras_datadesc_struct_append(bw_res_desc, "seconds",
- gras_datadesc_by_name("double"));
- gras_datadesc_struct_append(bw_res_desc, "bw",
- gras_datadesc_by_name("double"));
- gras_datadesc_struct_close(bw_res_desc);
- bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
+ bw_request_desc = xbt_datadesc_struct("s_bw_request_t");
+ xbt_datadesc_struct_append(bw_request_desc, "peer",
+ xbt_datadesc_by_name("s_xbt_peer_t"));
+ xbt_datadesc_struct_append(bw_request_desc, "buf_size",
+ xbt_datadesc_by_name("unsigned long int"));
+ xbt_datadesc_struct_append(bw_request_desc, "msg_size",
+ xbt_datadesc_by_name("unsigned long int"));
+ xbt_datadesc_struct_append(bw_request_desc, "msg_amount",
+ xbt_datadesc_by_name("unsigned long int"));
+ xbt_datadesc_struct_append(bw_request_desc, "min_duration",
+ xbt_datadesc_by_name("double"));
+ xbt_datadesc_struct_close(bw_request_desc);
+ bw_request_desc = xbt_datadesc_ref("bw_request_t", bw_request_desc);
+
+ bw_res_desc = xbt_datadesc_struct("s_bw_res_t");
+ xbt_datadesc_struct_append(bw_res_desc, "timestamp",
+ xbt_datadesc_by_name("unsigned int"));
+ xbt_datadesc_struct_append(bw_res_desc, "seconds",
+ xbt_datadesc_by_name("double"));
+ xbt_datadesc_struct_append(bw_res_desc, "bw",
+ xbt_datadesc_by_name("double"));
+ xbt_datadesc_struct_close(bw_res_desc);
+ bw_res_desc = xbt_datadesc_ref("bw_res_t", bw_res_desc);
gras_msgtype_declare_rpc("BW handshake", bw_request_desc,
bw_request_desc);
* bytes in a fat pipe.
*
*/
-void amok_bw_test(gras_socket_t peer,
+void amok_bw_test(xbt_socket_t peer,
unsigned long int buf_size,
unsigned long int msg_size,
unsigned long int msg_amount,
{
/* Measurement sockets for the experiments */
- gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
- int port;
+ volatile xbt_socket_t measMasterIn = NULL, measIn, measOut = NULL;
+ volatile int port;
bw_request_t request, request_ack;
xbt_ex_t e;
int first_pass;
CATCH(e) {
measMasterIn = NULL;
if (port == 10000 - 1) {
- RETHROW0("Error caught while opening a measurement socket: %s");
+ RETHROWF("Error caught while opening a measurement socket: %s");
} else {
xbt_ex_free(e);
}
request->msg_size = msg_size;
request->msg_amount = msg_amount;
request->peer.name = NULL;
- request->peer.port = gras_socket_my_port(measMasterIn);
- DEBUG6
- ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
- gras_socket_peer_name(peer), gras_socket_peer_port(peer),
+ request->peer.port = xbt_socket_my_port(measMasterIn);
+ XBT_DEBUG
+ ("Handshaking with %s:%d to connect it back on my %d (bufsize=%lu, msg_size=%lu, msg_amount=%lu)",
+ xbt_socket_peer_name(peer), xbt_socket_peer_port(peer),
request->peer.port, request->buf_size, request->msg_size,
request->msg_amount);
TRY {
gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
}
- CATCH(e) {
- RETHROW0("Error encountered while sending the BW request: %s");
+ CATCH_ANONYMOUS {
+ RETHROWF("Error encountered while sending the BW request: %s");
}
- measIn = gras_socket_meas_accept(measMasterIn);
+ measIn = xbt_socket_meas_accept(measMasterIn);
TRY {
- measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
+ measOut = gras_socket_client_ext(xbt_socket_peer_name(peer),
request_ack->peer.port,
request->buf_size, 1);
}
- CATCH(e) {
- RETHROW2
+ CATCH_ANONYMOUS {
+ RETHROWF
("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
- gras_socket_peer_name(peer), request_ack->peer.port);
+ xbt_socket_peer_name(peer), request_ack->peer.port);
}
- DEBUG2
- ("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
+ XBT_DEBUG
+ ("Got ACK; conduct the experiment (msg_size = %lu, msg_amount=%lu)",
request->msg_size, request->msg_amount);
*sec = 0;
((request->msg_size / ((double) 64 * 1024 * 1024))
* request->msg_amount) + 1;
- xbt_assert0(new_amount > request->msg_amount,
+ xbt_assert(new_amount > request->msg_amount,
"Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
request->msg_amount = new_amount;
request->msg_size = 64 * 1024 * 1024;
}
- VERB5
+ XBT_VERB
("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
meas_duration, min_duration, request->msg_size,
request->msg_amount,
first_pass = 0;
*sec = gras_os_time();
TRY {
- gras_socket_meas_send(measOut, 120, request->msg_size,
+ xbt_socket_meas_send(measOut, 120, request->msg_size,
request->msg_amount);
- DEBUG0("Data sent. Wait ACK");
- gras_socket_meas_recv(measIn, 120, 1, 1);
- } CATCH(e) {
+ XBT_DEBUG("Data sent. Wait ACK");
+ xbt_socket_meas_recv(measIn, 120, 1, 1);
+ }
+ CATCH_ANONYMOUS {
gras_socket_close(measOut);
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
- RETHROW0("Unable to conduct the experiment: %s");
+ RETHROWF("Unable to conduct the experiment: %s");
}
*sec = gras_os_time() - *sec;
if (*sec != 0.0) {
((double) request->msg_size) * ((double) request->msg_amount) /
(*sec);
}
- DEBUG1("Experiment done ; it took %f sec", *sec);
+ XBT_DEBUG("Experiment done ; it took %f sec", *sec);
if (*sec <= 0) {
- CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
+ XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
}
} while (*sec < min_duration);
- DEBUG2
+ XBT_DEBUG
("This measurement was long enough (%f sec; found %f b/s). Stop peer",
*sec, *bw);
gras_msg_send(peer, "BW stop", NULL);
*/
int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
{
- gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
- gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
- bw_request_t request = *(bw_request_t *) payload;
+ xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
+ volatile xbt_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
+ volatile bw_request_t request = *(bw_request_t *) payload;
bw_request_t answer;
xbt_ex_t e;
int port;
gras_msg_cb_ctx_t ctx_reask;
static xbt_dynar_t msgtwaited = NULL;
- DEBUG5
+ XBT_DEBUG
("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
- gras_socket_peer_name(expeditor), request->peer.port,
+ xbt_socket_peer_name(expeditor), request->peer.port,
request->buf_size, request->msg_size, request->msg_amount);
/* Build our answer */
xbt_ex_free(e);
else
/* FIXME: tell error to remote */
- RETHROW0
+ RETHROWF
("Error encountered while opening a measurement server socket: %s");
}
}
answer->buf_size = request->buf_size;
answer->msg_size = request->msg_size;
answer->msg_amount = request->msg_amount;
- answer->peer.port = gras_socket_my_port(measMasterIn);
+ answer->peer.port = xbt_socket_my_port(measMasterIn);
TRY {
gras_msg_rpcreturn(60, ctx, &answer);
}
- CATCH(e) {
+ CATCH_ANONYMOUS {
gras_socket_close(measMasterIn);
/* FIXME: tell error to remote */
- RETHROW0("Error encountered while sending the answer: %s");
+ RETHROWF("Error encountered while sending the answer: %s");
}
/* Don't connect asap to leave time to other side to enter the accept() */
TRY {
- measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
+ measOut = gras_socket_client_ext(xbt_socket_peer_name(expeditor),
request->peer.port,
request->buf_size, 1);
}
- CATCH(e) {
- RETHROW2
+ CATCH_ANONYMOUS {
+ RETHROWF
("Error encountered while opening a measurement socket back to %s:%d : %s",
- gras_socket_peer_name(expeditor), request->peer.port);
+ xbt_socket_peer_name(expeditor), request->peer.port);
/* FIXME: tell error to remote */
}
TRY {
- measIn = gras_socket_meas_accept(measMasterIn);
- DEBUG4
+ measIn = xbt_socket_meas_accept(measMasterIn);
+ XBT_DEBUG
("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
answer->buf_size, answer->msg_size, answer->msg_amount,
answer->peer.port);
}
- CATCH(e) {
+ CATCH_ANONYMOUS {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
gras_socket_close(measOut);
/* FIXME: tell error to remote ? */
- RETHROW0("Error encountered while opening the meas socket: %s");
+ RETHROWF("Error encountered while opening the meas socket: %s");
}
if (!msgtwaited) {
}
while (tooshort) {
- void *payload;
+ void *payloadgot;
int msggot;
TRY {
- gras_socket_meas_recv(measIn, 120, request->msg_size,
+ xbt_socket_meas_recv(measIn, 120, request->msg_size,
request->msg_amount);
- gras_socket_meas_send(measOut, 120, 1, 1);
- } CATCH(e) {
+ xbt_socket_meas_send(measOut, 120, 1, 1);
+ }
+ CATCH_ANONYMOUS {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
gras_socket_close(measOut);
/* FIXME: tell error to remote ? */
- RETHROW0("Error encountered while receiving the experiment: %s");
+ RETHROWF("Error encountered while receiving the experiment: %s");
}
- gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
+ gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payloadgot);
switch (msggot) {
case 0: /* BW stop */
tooshort = 0;
case 1: /* BW reask */
tooshort = 1;
free(request);
- request = (bw_request_t) payload;
- VERB0("Return the reasking RPC");
+ request = (bw_request_t) payloadgot;
+ XBT_VERB("Return the reasking RPC");
gras_msg_rpcreturn(60, ctx_reask, NULL);
}
gras_msg_cb_ctx_free(ctx_reask);
gras_socket_close(measOut);
free(answer);
free(request);
- VERB0("BW experiment done.");
+ XBT_VERB("BW experiment done.");
return 0;
}
double min_duration, /*OUT*/ double *sec, double *bw)
{
- gras_socket_t sock;
+ xbt_socket_t sock;
/* The request */
bw_request_t request;
bw_res_t result;
- DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
+ XBT_DEBUG("Ask for a BW test between %s:%u and %s:%u", from_name, from_port,
to_name, to_port);
gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
if (bw)
*bw = result->bw;
- VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
+ XBT_VERB("BW test (%s:%u -> %s:%u) took %f sec (%f kb/s)",
from_name, from_port, to_name, to_port,
result->sec, ((double) result->bw) / 1024.0);
/* specification of the test to run, and our answer */
bw_request_t request = *(bw_request_t *) payload;
bw_res_t result = xbt_new0(s_bw_res_t, 1);
- gras_socket_t peer, asker;
+ xbt_socket_t peer, asker;
asker = gras_msg_cb_ctx_from(ctx);
- VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
- gras_socket_peer_name(asker), gras_socket_peer_port(asker),
+ XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %lu %lu)",
+ xbt_socket_peer_name(asker), xbt_socket_peer_port(asker),
request->peer.name, request->peer.port,
request->msg_size, request->msg_amount);
peer = gras_socket_client(request->peer.name, request->peer.port);