-xbt_error_t amok_bw_test(gras_socket_t peer,
- unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
- /*OUT*/ double *sec, double *bw) {
- gras_socket_t rawIn,rawOut; /* raw sockets for the experiments */
- gras_socket_t sock_dummy; /* ignored arg to msg_wait */
- int port;
- xbt_error_t errcode;
- bw_request_t request,request_ack;
-
- for (port = 5000, errcode = system_error;
- errcode == system_error;
- errcode = gras_socket_server_ext(++port,buf_size,1,&rawIn));
- if (errcode != no_error) {
- ERROR1("Error %s encountered while opening a raw socket",
- xbt_error_name(errcode));
- return errcode;
- }
-
- request=xbt_new0(s_bw_request_t,1);
- request->buf_size=buf_size;
- request->exp_size=exp_size;
- request->msg_size=msg_size;
- request->host.name = NULL;
- request->host.port = gras_socket_my_port(rawIn);
- INFO1("Send an handshake to get the dude connect to port %d on me", request->host.port);
-
- if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
- ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
- return errcode;
- }
- if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
- ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
- xbt_error_name(errcode));
- return errcode;
- }
-
- /* FIXME: What if there is a remote error? */
-
- if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&rawOut))) {
- ERROR3("Error %s encountered while opening the raw socket to %s:%d for BW test\n",
- xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
- return errcode;
- }
+void amok_bw_test(xbt_socket_t peer,
+ unsigned long int buf_size,
+ unsigned long int msg_size,
+ unsigned long int msg_amount,
+ double min_duration, /*OUT*/ double *sec, double *bw)
+{
+
+ /* Measurement sockets for the experiments */
+ volatile xbt_socket_t measMasterIn = NULL, measIn, measOut = NULL;
+ volatile int port;
+ bw_request_t request, request_ack;
+ xbt_ex_t e;
+ int first_pass;
+
+ for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
+ TRY {
+ measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
+ }
+ CATCH(e) {
+ measMasterIn = NULL;
+ if (port == 10000 - 1) {
+ RETHROWF("Error caught while opening a measurement socket: %s");
+ } else {
+ xbt_ex_free(e);
+ }
+ }
+ }
+
+ request = xbt_new0(s_bw_request_t, 1);
+ request->buf_size = buf_size;
+ request->msg_size = msg_size;
+ request->msg_amount = msg_amount;
+ request->peer.name = NULL;
+ request->peer.port = xbt_socket_my_port(measMasterIn);
+ XBT_DEBUG
+ ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
+ xbt_socket_peer_name(peer), xbt_socket_peer_port(peer),
+ request->peer.port, request->buf_size, request->msg_size,
+ request->msg_amount);
+
+ TRY {
+ gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
+ }
+ CATCH_ANONYMOUS {
+ RETHROWF("Error encountered while sending the BW request: %s");
+ }
+ measIn = xbt_socket_meas_accept(measMasterIn);
+
+ TRY {
+ measOut = gras_socket_client_ext(xbt_socket_peer_name(peer),
+ request_ack->peer.port,
+ request->buf_size, 1);
+ }
+ CATCH_ANONYMOUS {
+ RETHROWF
+ ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
+ xbt_socket_peer_name(peer), request_ack->peer.port);
+ }
+ XBT_DEBUG
+ ("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
+ request->msg_size, request->msg_amount);
+
+ *sec = 0;
+ first_pass = 1;
+ do {
+ if (first_pass == 0) {
+ double meas_duration = *sec;
+ double increase;
+ if (*sec != 0.0) {
+ increase = (min_duration / meas_duration) * 1.1;
+ } else {
+ increase = 4;
+ }
+ /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
+ if (increase > 20)
+ increase = 20;
+
+ request->msg_size = request->msg_size * increase;
+
+ /* Do not do too large experiments messages or the sensors
+ will start to swap to store one of them.
+ And then increase the number of messages to compensate (check for overflow there, too) */
+ if (request->msg_size > 64 * 1024 * 1024) {
+ unsigned long int new_amount =
+ ((request->msg_size / ((double) 64 * 1024 * 1024))
+ * request->msg_amount) + 1;
+
+ xbt_assert(new_amount > request->msg_amount,
+ "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
+ request->msg_amount = new_amount;
+
+ request->msg_size = 64 * 1024 * 1024;
+ }
+
+ XBT_VERB
+ ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
+ meas_duration, min_duration, request->msg_size,
+ request->msg_amount,
+ ((double) request->msg_size) * ((double) request->msg_amount /
+ (*sec) / 1024.0 / 1024.0));
+
+ gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
+ }
+
+ first_pass = 0;
+ *sec = gras_os_time();
+ TRY {
+ xbt_socket_meas_send(measOut, 120, request->msg_size,
+ request->msg_amount);
+ XBT_DEBUG("Data sent. Wait ACK");
+ xbt_socket_meas_recv(measIn, 120, 1, 1);
+ }
+ CATCH_ANONYMOUS {
+ gras_socket_close(measOut);
+ gras_socket_close(measMasterIn);
+ gras_socket_close(measIn);
+ RETHROWF("Unable to conduct the experiment: %s");
+ }
+ *sec = gras_os_time() - *sec;
+ if (*sec != 0.0) {
+ *bw =
+ ((double) request->msg_size) * ((double) request->msg_amount) /
+ (*sec);
+ }
+ XBT_DEBUG("Experiment done ; it took %f sec", *sec);
+ if (*sec <= 0) {
+ XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
+ }
+
+ } while (*sec < min_duration);
+
+ XBT_DEBUG
+ ("This measurement was long enough (%f sec; found %f b/s). Stop peer",
+ *sec, *bw);
+ gras_msg_send(peer, "BW stop", NULL);
+