3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void)
29 if (!_amok_bw_initialized) {
37 _amok_bw_initialized++;
40 /** @brief module finalization */
41 void amok_bw_exit(void)
43 if (!_amok_bw_initialized)
49 _amok_bw_initialized--;
52 /* ***************************************************************************
54 * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
58 void amok_bw_bw_init()
60 gras_datadesc_type_t bw_request_desc, bw_res_desc;
62 /* Build the Bandwidth datatype descriptions */
63 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
64 gras_datadesc_struct_append(bw_request_desc, "peer",
65 gras_datadesc_by_name("s_xbt_peer_t"));
66 gras_datadesc_struct_append(bw_request_desc, "buf_size",
67 gras_datadesc_by_name("unsigned long int"));
68 gras_datadesc_struct_append(bw_request_desc, "msg_size",
69 gras_datadesc_by_name("unsigned long int"));
70 gras_datadesc_struct_append(bw_request_desc, "msg_amount",
71 gras_datadesc_by_name("unsigned long int"));
72 gras_datadesc_struct_append(bw_request_desc, "min_duration",
73 gras_datadesc_by_name("double"));
74 gras_datadesc_struct_close(bw_request_desc);
75 bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
77 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
78 gras_datadesc_struct_append(bw_res_desc, "timestamp",
79 gras_datadesc_by_name("unsigned int"));
80 gras_datadesc_struct_append(bw_res_desc, "seconds",
81 gras_datadesc_by_name("double"));
82 gras_datadesc_struct_append(bw_res_desc, "bw",
83 gras_datadesc_by_name("double"));
84 gras_datadesc_struct_close(bw_res_desc);
85 bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
87 gras_msgtype_declare_rpc("BW handshake", bw_request_desc, bw_request_desc);
89 gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
90 gras_msgtype_declare("BW stop", NULL);
92 gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
95 void amok_bw_bw_join()
97 gras_cb_register("BW request", &amok_bw_cb_bw_request);
98 gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
101 void amok_bw_bw_leave()
103 gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
104 gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
108 * \brief bandwidth measurement between localhost and \e peer
110 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
112 * \arg msg_size: Size of each message sent.
113 * \arg msg_amount: Amount of such messages to exchange
114 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
115 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
116 * \arg bw: observed Bandwidth (in byte/s)
118 * Conduct a bandwidth test from the local process to the given peer.
119 * This call is blocking until the end of the experiment.
121 * If the asked experiment lasts less than \a min_duration, another one will be
122 * launched (and others, if needed). msg_size will be multiplicated by
123 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
124 * reach the \a min_duration). In that case, the reported bandwidth and
125 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
126 * because we need to malloc a block of this size in RL to conduct the
127 * experiment, and we still don't want to visit the swap. In such case, the
128 * number of messages is increased instead of their size.
130 * Results are reported in last args, and sizes are in byte.
132 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
133 * as the total amount of data to send and the msg_size. This
134 * was changed for the fool wanting to send more than MAXINT
135 * bytes in a fat pipe.
138 void amok_bw_test(gras_socket_t peer,
139 unsigned long int buf_size,
140 unsigned long int msg_size,
141 unsigned long int msg_amount,
142 double min_duration, /*OUT*/ double *sec, double *bw)
145 /* Measurement sockets for the experiments */
146 gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
148 bw_request_t request, request_ack;
152 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
154 measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
158 if (port == 10000 - 1) {
159 RETHROW0("Error caught while opening a measurement socket: %s");
166 request = xbt_new0(s_bw_request_t, 1);
167 request->buf_size = buf_size;
168 request->msg_size = msg_size;
169 request->msg_amount = msg_amount;
170 request->peer.name = NULL;
171 request->peer.port = gras_socket_my_port(measMasterIn);
173 ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
174 gras_socket_peer_name(peer), gras_socket_peer_port(peer),
175 request->peer.port, request->buf_size, request->msg_size,
176 request->msg_amount);
179 gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
182 RETHROW0("Error encountered while sending the BW request: %s");
184 measIn = gras_socket_meas_accept(measMasterIn);
187 measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
188 request_ack->peer.port,
189 request->buf_size, 1);
193 ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
194 gras_socket_peer_name(peer), request_ack->peer.port);
196 DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
197 request->msg_size, request->msg_amount);
202 if (first_pass == 0) {
203 double meas_duration = *sec;
206 increase = (min_duration / meas_duration) * 1.1;
210 /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
214 request->msg_size = request->msg_size * increase;
216 /* Do not do too large experiments messages or the sensors
217 will start to swap to store one of them.
218 And then increase the number of messages to compensate (check for overflow there, too) */
219 if (request->msg_size > 64 * 1024 * 1024) {
220 unsigned long int new_amount =
221 ((request->msg_size / ((double) 64 * 1024 * 1024))
222 * request->msg_amount) + 1;
224 xbt_assert0(new_amount > request->msg_amount,
225 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
226 request->msg_amount = new_amount;
228 request->msg_size = 64 * 1024 * 1024;
232 ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
233 meas_duration, min_duration, request->msg_size, request->msg_amount,
234 ((double) request->msg_size) * ((double) request->msg_amount /
235 (*sec) / 1024.0 / 1024.0));
237 gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
241 *sec = gras_os_time();
243 gras_socket_meas_send(measOut, 120, request->msg_size,
244 request->msg_amount);
245 DEBUG0("Data sent. Wait ACK");
246 gras_socket_meas_recv(measIn, 120, 1, 1);
248 gras_socket_close(measOut);
249 gras_socket_close(measMasterIn);
250 gras_socket_close(measIn);
251 RETHROW0("Unable to conduct the experiment: %s");
253 *sec = gras_os_time() - *sec;
256 ((double) request->msg_size) * ((double) request->msg_amount) /
259 DEBUG1("Experiment done ; it took %f sec", *sec);
261 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
264 } while (*sec < min_duration);
266 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
268 gras_msg_send(peer, "BW stop", NULL);
272 if (measIn != measMasterIn)
273 gras_socket_close(measIn);
274 gras_socket_close(measMasterIn);
275 gras_socket_close(measOut);
279 /* Callback to the "BW handshake" message:
280 opens a server measurement socket,
281 indicate its port in an "BW handshaked" message,
282 receive the corresponding data on the measurement socket,
283 close the measurement socket
287 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
289 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
290 gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
291 bw_request_t request = *(bw_request_t *) payload;
296 gras_msg_cb_ctx_t ctx_reask;
297 static xbt_dynar_t msgtwaited = NULL;
300 ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
301 gras_socket_peer_name(expeditor), request->peer.port, request->buf_size,
302 request->msg_size, request->msg_amount);
304 /* Build our answer */
305 answer = xbt_new0(s_bw_request_t, 1);
307 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
309 measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
316 /* FIXME: tell error to remote */
318 ("Error encountered while opening a measurement server socket: %s");
322 answer->buf_size = request->buf_size;
323 answer->msg_size = request->msg_size;
324 answer->msg_amount = request->msg_amount;
325 answer->peer.port = gras_socket_my_port(measMasterIn);
328 gras_msg_rpcreturn(60, ctx, &answer);
331 gras_socket_close(measMasterIn);
332 /* FIXME: tell error to remote */
333 RETHROW0("Error encountered while sending the answer: %s");
337 /* Don't connect asap to leave time to other side to enter the accept() */
339 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
341 request->buf_size, 1);
345 ("Error encountered while opening a measurement socket back to %s:%d : %s",
346 gras_socket_peer_name(expeditor), request->peer.port);
347 /* FIXME: tell error to remote */
351 measIn = gras_socket_meas_accept(measMasterIn);
353 ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
354 answer->buf_size, answer->msg_size, answer->msg_amount,
358 gras_socket_close(measMasterIn);
359 gras_socket_close(measIn);
360 gras_socket_close(measOut);
361 /* FIXME: tell error to remote ? */
362 RETHROW0("Error encountered while opening the meas socket: %s");
366 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
367 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
368 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
375 gras_socket_meas_recv(measIn, 120, request->msg_size,
376 request->msg_amount);
377 gras_socket_meas_send(measOut, 120, 1, 1);
379 gras_socket_close(measMasterIn);
380 gras_socket_close(measIn);
381 gras_socket_close(measOut);
382 /* FIXME: tell error to remote ? */
383 RETHROW0("Error encountered while receiving the experiment: %s");
385 gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
387 case 0: /* BW stop */
390 case 1: /* BW reask */
393 request = (bw_request_t) payload;
394 VERB0("Return the reasking RPC");
395 gras_msg_rpcreturn(60, ctx_reask, NULL);
397 gras_msg_cb_ctx_free(ctx_reask);
400 if (measIn != measMasterIn)
401 gras_socket_close(measMasterIn);
402 gras_socket_close(measIn);
403 gras_socket_close(measOut);
406 VERB0("BW experiment done.");
411 * \brief request a bandwidth measurement between two remote peers
413 * \arg from_name: Name of the first peer
414 * \arg from_port: port on which the first process is listening for messages
415 * \arg to_name: Name of the second peer
416 * \arg to_port: port on which the second process is listening (for messages, do not
417 * give a measurement socket here. The needed measurement sockets will be created
418 * automatically and negociated between the peers)
419 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
420 * \arg msg_size: Size of each message sent.
421 * \arg msg_amount: Amount of such data to exchange
422 * \arg sec: where the result (in seconds) should be stored.
423 * \arg bw: observed Bandwidth (in byte/s)
425 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
426 * This call is blocking until the end of the experiment.
428 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
429 * as the total amount of data to send and the msg_size. This
430 * was changed for the fool wanting to send more than MAXINT
431 * bytes in a fat pipe.
433 * Results are reported in last args, and sizes are in bytes.
435 void amok_bw_request(const char *from_name, unsigned int from_port,
436 const char *to_name, unsigned int to_port,
437 unsigned long int buf_size,
438 unsigned long int msg_size,
439 unsigned long int msg_amount,
440 double min_duration, /*OUT*/ double *sec, double *bw)
445 bw_request_t request;
447 request = xbt_new0(s_bw_request_t, 1);
448 request->buf_size = buf_size;
449 request->msg_size = msg_size;
450 request->msg_amount = msg_amount;
451 request->min_duration = min_duration;
454 request->peer.name = (char *) to_name;
455 request->peer.port = to_port;
458 sock = gras_socket_client(from_name, from_port);
462 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
464 gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
471 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
472 from_name, from_port, to_name, to_port,
473 result->sec, ((double) result->bw) / 1024.0);
475 gras_socket_close(sock);
480 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
483 /* specification of the test to run, and our answer */
484 bw_request_t request = *(bw_request_t *) payload;
485 bw_res_t result = xbt_new0(s_bw_res_t, 1);
486 gras_socket_t peer, asker;
488 asker = gras_msg_cb_ctx_from(ctx);
489 VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
490 gras_socket_peer_name(asker), gras_socket_peer_port(asker),
491 request->peer.name, request->peer.port,
492 request->msg_size, request->msg_amount);
493 peer = gras_socket_client(request->peer.name, request->peer.port);
495 request->buf_size, request->msg_size, request->msg_amount,
496 request->min_duration, &(result->sec), &(result->bw));
498 gras_msg_rpcreturn(240, ctx, &result);
501 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
502 free(request->peer.name);
509 /** \brief builds a matrix of results of bandwidth measurement
511 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
512 * as the total amount of data to send and the msg_size. This
513 * was changed for the fool wanting to send more than MAXINT
514 * bytes in a fat pipe.
516 double *amok_bw_matrix(xbt_dynar_t peers,
517 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
521 /* construction of matrices for bandwith and latency */
525 int len = xbt_dynar_length(peers);
527 double *matrix_res = xbt_new0(double, len * len);
530 xbt_dynar_foreach(peers, i, p1) {
531 xbt_dynar_foreach(peers, j, p2) {
533 /* Mesurements of Bandwidth */
534 amok_bw_request(p1->name, p1->port, p2->name, p2->port,
535 buf_size_bw, msg_size_bw, msg_amount_bw, min_duration,
536 &sec, &matrix_res[i * len + j]);