+/**
+ * \brief bandwidth measurement between localhost and \e peer
+ *
+ * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
+ * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
+ * \arg msg_size: Size of each message sent.
+ * \arg msg_amount: Amount of such messages to exchange
+ * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
+ * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
+ * \arg bw: observed Bandwidth (in byte/s)
+ *
+ * Conduct a bandwidth test from the local process to the given peer.
+ * This call is blocking until the end of the experiment.
+ *
+ * If the asked experiment lasts less than \a min_duration, another one will be
+ * launched (and others, if needed). msg_size will be multiplicated by
+ * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
+ * reach the \a min_duration). In that case, the reported bandwidth and
+ * duration are the ones of the last run. \a msg_size cannot go over 64Mb
+ * because we need to malloc a block of this size in RL to conduct the
+ * experiment, and we still don't want to visit the swap. In such case, the
+ * number of messages is increased instead of their size.
+ *
+ * Results are reported in last args, and sizes are in byte.
+ *
+ * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
+ * as the total amount of data to send and the msg_size. This
+ * was changed for the fool wanting to send more than MAXINT
+ * bytes in a fat pipe.
+ *
+ */
+void amok_bw_test(gras_socket_t peer,
+ unsigned long int buf_size,
+ unsigned long int msg_size,
+ unsigned long int msg_amount,
+ double min_duration, /*OUT*/ double *sec, double *bw)
+{
+
+ /* Measurement sockets for the experiments */
+ volatile gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
+ volatile int port;
+ bw_request_t request, request_ack;
+ xbt_ex_t e;
+ int first_pass;
+
+ for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
+ TRY {
+ measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
+ }
+ CATCH(e) {
+ measMasterIn = NULL;
+ if (port == 10000 - 1) {
+ RETHROWF("Error caught while opening a measurement socket: %s");
+ } else {
+ xbt_ex_free(e);
+ }
+ }
+ }
+
+ request = xbt_new0(s_bw_request_t, 1);
+ request->buf_size = buf_size;
+ request->msg_size = msg_size;
+ request->msg_amount = msg_amount;
+ request->peer.name = NULL;
+ request->peer.port = gras_socket_my_port(measMasterIn);
+ XBT_DEBUG
+ ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
+ gras_socket_peer_name(peer), gras_socket_peer_port(peer),
+ request->peer.port, request->buf_size, request->msg_size,
+ request->msg_amount);
+
+ TRY {
+ gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
+ }
+ CATCH_ANONYMOUS {
+ RETHROWF("Error encountered while sending the BW request: %s");
+ }
+ measIn = gras_socket_meas_accept(measMasterIn);
+
+ TRY {
+ measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
+ request_ack->peer.port,
+ request->buf_size, 1);
+ }
+ CATCH_ANONYMOUS {
+ RETHROWF
+ ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
+ gras_socket_peer_name(peer), request_ack->peer.port);
+ }
+ XBT_DEBUG
+ ("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
+ request->msg_size, request->msg_amount);
+
+ *sec = 0;
+ first_pass = 1;
+ do {
+ if (first_pass == 0) {
+ double meas_duration = *sec;
+ double increase;
+ if (*sec != 0.0) {
+ increase = (min_duration / meas_duration) * 1.1;
+ } else {
+ increase = 4;
+ }
+ /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
+ if (increase > 20)
+ increase = 20;
+
+ request->msg_size = request->msg_size * increase;
+
+ /* Do not do too large experiments messages or the sensors
+ will start to swap to store one of them.
+ And then increase the number of messages to compensate (check for overflow there, too) */
+ if (request->msg_size > 64 * 1024 * 1024) {
+ unsigned long int new_amount =
+ ((request->msg_size / ((double) 64 * 1024 * 1024))
+ * request->msg_amount) + 1;
+
+ xbt_assert(new_amount > request->msg_amount,
+ "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
+ request->msg_amount = new_amount;
+
+ request->msg_size = 64 * 1024 * 1024;
+ }
+
+ XBT_VERB
+ ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
+ meas_duration, min_duration, request->msg_size,
+ request->msg_amount,
+ ((double) request->msg_size) * ((double) request->msg_amount /
+ (*sec) / 1024.0 / 1024.0));
+
+ gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
+ }
+
+ first_pass = 0;
+ *sec = gras_os_time();
+ TRY {
+ gras_socket_meas_send(measOut, 120, request->msg_size,
+ request->msg_amount);
+ XBT_DEBUG("Data sent. Wait ACK");
+ gras_socket_meas_recv(measIn, 120, 1, 1);
+ }
+ CATCH_ANONYMOUS {
+ gras_socket_close(measOut);
+ gras_socket_close(measMasterIn);
+ gras_socket_close(measIn);
+ RETHROWF("Unable to conduct the experiment: %s");
+ }
+ *sec = gras_os_time() - *sec;
+ if (*sec != 0.0) {
+ *bw =
+ ((double) request->msg_size) * ((double) request->msg_amount) /
+ (*sec);
+ }
+ XBT_DEBUG("Experiment done ; it took %f sec", *sec);
+ if (*sec <= 0) {
+ XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
+ }
+
+ } while (*sec < min_duration);
+
+ XBT_DEBUG
+ ("This measurement was long enough (%f sec; found %f b/s). Stop peer",
+ *sec, *bw);
+ gras_msg_send(peer, "BW stop", NULL);
+
+ free(request_ack);
+ free(request);
+ if (measIn != measMasterIn)
+ gras_socket_close(measIn);
+ gras_socket_close(measMasterIn);
+ gras_socket_close(measOut);