1 /* amok_bandwidth - Bandwidth tests facilities */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
16 /******************************
17 * Stuff global to the module *
18 ******************************/
20 static short _amok_bw_initialized = 0;
22 /** @brief module initialization; all participating nodes must run this */
23 void amok_bw_init(void)
26 if (!_amok_bw_initialized) {
34 _amok_bw_initialized++;
37 /** @brief module finalization */
38 void amok_bw_exit(void)
40 if (!_amok_bw_initialized)
46 _amok_bw_initialized--;
49 /* ***************************************************************************
51 * ***************************************************************************/
52 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
53 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55 void amok_bw_bw_init()
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc, "peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc, "buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc, "msg_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc, "msg_amount",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc, "min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc, "timestamp",
76 gras_datadesc_by_name("unsigned int"));
77 gras_datadesc_struct_append(bw_res_desc, "seconds",
78 gras_datadesc_by_name("double"));
79 gras_datadesc_struct_append(bw_res_desc, "bw",
80 gras_datadesc_by_name("double"));
81 gras_datadesc_struct_close(bw_res_desc);
82 bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
84 gras_msgtype_declare_rpc("BW handshake", bw_request_desc,
87 gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
88 gras_msgtype_declare("BW stop", NULL);
90 gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
93 void amok_bw_bw_join()
95 gras_cb_register("BW request", &amok_bw_cb_bw_request);
96 gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
99 void amok_bw_bw_leave()
101 gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
102 gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
106 * \brief bandwidth measurement between localhost and \e peer
108 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
109 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
110 * \arg msg_size: Size of each message sent.
111 * \arg msg_amount: Amount of such messages to exchange
112 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
113 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
114 * \arg bw: observed Bandwidth (in byte/s)
116 * Conduct a bandwidth test from the local process to the given peer.
117 * This call is blocking until the end of the experiment.
119 * If the asked experiment lasts less than \a min_duration, another one will be
120 * launched (and others, if needed). msg_size will be multiplicated by
121 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
122 * reach the \a min_duration). In that case, the reported bandwidth and
123 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
124 * because we need to malloc a block of this size in RL to conduct the
125 * experiment, and we still don't want to visit the swap. In such case, the
126 * number of messages is increased instead of their size.
128 * Results are reported in last args, and sizes are in byte.
130 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
131 * as the total amount of data to send and the msg_size. This
132 * was changed for the fool wanting to send more than MAXINT
133 * bytes in a fat pipe.
136 void amok_bw_test(gras_socket_t peer,
137 unsigned long int buf_size,
138 unsigned long int msg_size,
139 unsigned long int msg_amount,
140 double min_duration, /*OUT*/ double *sec, double *bw)
143 /* Measurement sockets for the experiments */
144 volatile gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
146 bw_request_t request, request_ack;
150 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
152 measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
156 if (port == 10000 - 1) {
157 RETHROWF("Error caught while opening a measurement socket: %s");
164 request = xbt_new0(s_bw_request_t, 1);
165 request->buf_size = buf_size;
166 request->msg_size = msg_size;
167 request->msg_amount = msg_amount;
168 request->peer.name = NULL;
169 request->peer.port = gras_socket_my_port(measMasterIn);
171 ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
172 gras_socket_peer_name(peer), gras_socket_peer_port(peer),
173 request->peer.port, request->buf_size, request->msg_size,
174 request->msg_amount);
177 gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
180 RETHROWF("Error encountered while sending the BW request: %s");
182 measIn = gras_socket_meas_accept(measMasterIn);
185 measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
186 request_ack->peer.port,
187 request->buf_size, 1);
191 ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
192 gras_socket_peer_name(peer), request_ack->peer.port);
195 ("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
196 request->msg_size, request->msg_amount);
201 if (first_pass == 0) {
202 double meas_duration = *sec;
205 increase = (min_duration / meas_duration) * 1.1;
209 /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
213 request->msg_size = request->msg_size * increase;
215 /* Do not do too large experiments messages or the sensors
216 will start to swap to store one of them.
217 And then increase the number of messages to compensate (check for overflow there, too) */
218 if (request->msg_size > 64 * 1024 * 1024) {
219 unsigned long int new_amount =
220 ((request->msg_size / ((double) 64 * 1024 * 1024))
221 * request->msg_amount) + 1;
223 xbt_assert(new_amount > request->msg_amount,
224 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
225 request->msg_amount = new_amount;
227 request->msg_size = 64 * 1024 * 1024;
231 ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
232 meas_duration, min_duration, request->msg_size,
234 ((double) request->msg_size) * ((double) request->msg_amount /
235 (*sec) / 1024.0 / 1024.0));
237 gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
241 *sec = gras_os_time();
243 gras_socket_meas_send(measOut, 120, request->msg_size,
244 request->msg_amount);
245 XBT_DEBUG("Data sent. Wait ACK");
246 gras_socket_meas_recv(measIn, 120, 1, 1);
249 gras_socket_close(measOut);
250 gras_socket_close(measMasterIn);
251 gras_socket_close(measIn);
252 RETHROWF("Unable to conduct the experiment: %s");
254 *sec = gras_os_time() - *sec;
257 ((double) request->msg_size) * ((double) request->msg_amount) /
260 XBT_DEBUG("Experiment done ; it took %f sec", *sec);
262 XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
265 } while (*sec < min_duration);
268 ("This measurement was long enough (%f sec; found %f b/s). Stop peer",
270 gras_msg_send(peer, "BW stop", NULL);
274 if (measIn != measMasterIn)
275 gras_socket_close(measIn);
276 gras_socket_close(measMasterIn);
277 gras_socket_close(measOut);
281 /* Callback to the "BW handshake" message:
282 opens a server measurement socket,
283 indicate its port in an "BW handshaked" message,
284 receive the corresponding data on the measurement socket,
285 close the measurement socket
289 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
291 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
292 volatile gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
293 volatile bw_request_t request = *(bw_request_t *) payload;
298 gras_msg_cb_ctx_t ctx_reask;
299 static xbt_dynar_t msgtwaited = NULL;
302 ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
303 gras_socket_peer_name(expeditor), request->peer.port,
304 request->buf_size, request->msg_size, request->msg_amount);
306 /* Build our answer */
307 answer = xbt_new0(s_bw_request_t, 1);
309 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
311 measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
318 /* FIXME: tell error to remote */
320 ("Error encountered while opening a measurement server socket: %s");
324 answer->buf_size = request->buf_size;
325 answer->msg_size = request->msg_size;
326 answer->msg_amount = request->msg_amount;
327 answer->peer.port = gras_socket_my_port(measMasterIn);
330 gras_msg_rpcreturn(60, ctx, &answer);
333 gras_socket_close(measMasterIn);
334 /* FIXME: tell error to remote */
335 RETHROWF("Error encountered while sending the answer: %s");
339 /* Don't connect asap to leave time to other side to enter the accept() */
341 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
343 request->buf_size, 1);
347 ("Error encountered while opening a measurement socket back to %s:%d : %s",
348 gras_socket_peer_name(expeditor), request->peer.port);
349 /* FIXME: tell error to remote */
353 measIn = gras_socket_meas_accept(measMasterIn);
355 ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
356 answer->buf_size, answer->msg_size, answer->msg_amount,
360 gras_socket_close(measMasterIn);
361 gras_socket_close(measIn);
362 gras_socket_close(measOut);
363 /* FIXME: tell error to remote ? */
364 RETHROWF("Error encountered while opening the meas socket: %s");
368 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
369 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
370 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
377 gras_socket_meas_recv(measIn, 120, request->msg_size,
378 request->msg_amount);
379 gras_socket_meas_send(measOut, 120, 1, 1);
382 gras_socket_close(measMasterIn);
383 gras_socket_close(measIn);
384 gras_socket_close(measOut);
385 /* FIXME: tell error to remote ? */
386 RETHROWF("Error encountered while receiving the experiment: %s");
388 gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
390 case 0: /* BW stop */
393 case 1: /* BW reask */
396 request = (bw_request_t) payload;
397 XBT_VERB("Return the reasking RPC");
398 gras_msg_rpcreturn(60, ctx_reask, NULL);
400 gras_msg_cb_ctx_free(ctx_reask);
403 if (measIn != measMasterIn)
404 gras_socket_close(measMasterIn);
405 gras_socket_close(measIn);
406 gras_socket_close(measOut);
409 XBT_VERB("BW experiment done.");
414 * \brief request a bandwidth measurement between two remote peers
416 * \arg from_name: Name of the first peer
417 * \arg from_port: port on which the first process is listening for messages
418 * \arg to_name: Name of the second peer
419 * \arg to_port: port on which the second process is listening (for messages, do not
420 * give a measurement socket here. The needed measurement sockets will be created
421 * automatically and negociated between the peers)
422 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
423 * \arg msg_size: Size of each message sent.
424 * \arg msg_amount: Amount of such data to exchange
425 * \arg sec: where the result (in seconds) should be stored.
426 * \arg bw: observed Bandwidth (in byte/s)
428 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
429 * This call is blocking until the end of the experiment.
431 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
432 * as the total amount of data to send and the msg_size. This
433 * was changed for the fool wanting to send more than MAXINT
434 * bytes in a fat pipe.
436 * Results are reported in last args, and sizes are in bytes.
438 void amok_bw_request(const char *from_name, unsigned int from_port,
439 const char *to_name, unsigned int to_port,
440 unsigned long int buf_size,
441 unsigned long int msg_size,
442 unsigned long int msg_amount,
443 double min_duration, /*OUT*/ double *sec, double *bw)
448 bw_request_t request;
450 request = xbt_new0(s_bw_request_t, 1);
451 request->buf_size = buf_size;
452 request->msg_size = msg_size;
453 request->msg_amount = msg_amount;
454 request->min_duration = min_duration;
457 request->peer.name = (char *) to_name;
458 request->peer.port = to_port;
461 sock = gras_socket_client(from_name, from_port);
465 XBT_DEBUG("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
467 gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
474 XBT_VERB("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
475 from_name, from_port, to_name, to_port,
476 result->sec, ((double) result->bw) / 1024.0);
478 gras_socket_close(sock);
483 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
486 /* specification of the test to run, and our answer */
487 bw_request_t request = *(bw_request_t *) payload;
488 bw_res_t result = xbt_new0(s_bw_res_t, 1);
489 gras_socket_t peer, asker;
491 asker = gras_msg_cb_ctx_from(ctx);
492 XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
493 gras_socket_peer_name(asker), gras_socket_peer_port(asker),
494 request->peer.name, request->peer.port,
495 request->msg_size, request->msg_amount);
496 peer = gras_socket_client(request->peer.name, request->peer.port);
498 request->buf_size, request->msg_size, request->msg_amount,
499 request->min_duration, &(result->sec), &(result->bw));
501 gras_msg_rpcreturn(240, ctx, &result);
504 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
505 free(request->peer.name);
512 /** \brief builds a matrix of results of bandwidth measurement
514 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
515 * as the total amount of data to send and the msg_size. This
516 * was changed for the fool wanting to send more than MAXINT
517 * bytes in a fat pipe.
519 double *amok_bw_matrix(xbt_dynar_t peers,
520 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
524 /* construction of matrices for bandwith and latency */
528 int len = xbt_dynar_length(peers);
530 double *matrix_res = xbt_new0(double, len * len);
533 xbt_dynar_foreach(peers, i, p1) {
534 xbt_dynar_foreach(peers, j, p2) {
536 /* Mesurements of Bandwidth */
537 amok_bw_request(p1->name, p1->port, p2->name, p2->port,
538 buf_size_bw, msg_size_bw, msg_amount_bw,
539 min_duration, &sec, &matrix_res[i * len + j]);