1 /* amok_bandwidth - Bandwidth tests facilities */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
16 /******************************
17 * Stuff global to the module *
18 ******************************/
20 static short _amok_bw_initialized = 0;
22 /** @brief module initialization; all participating nodes must run this */
23 void amok_bw_init(void)
26 if (!_amok_bw_initialized) {
34 _amok_bw_initialized++;
37 /** @brief module finalization */
38 void amok_bw_exit(void)
40 if (!_amok_bw_initialized)
46 _amok_bw_initialized--;
49 /* ***************************************************************************
51 * ***************************************************************************/
52 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
53 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55 void amok_bw_bw_init()
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc, "peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc, "buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc, "msg_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc, "msg_amount",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc, "min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc, "timestamp",
76 gras_datadesc_by_name("unsigned int"));
77 gras_datadesc_struct_append(bw_res_desc, "seconds",
78 gras_datadesc_by_name("double"));
79 gras_datadesc_struct_append(bw_res_desc, "bw",
80 gras_datadesc_by_name("double"));
81 gras_datadesc_struct_close(bw_res_desc);
82 bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
84 gras_msgtype_declare_rpc("BW handshake", bw_request_desc, bw_request_desc);
86 gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
87 gras_msgtype_declare("BW stop", NULL);
89 gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
92 void amok_bw_bw_join()
94 gras_cb_register("BW request", &amok_bw_cb_bw_request);
95 gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
98 void amok_bw_bw_leave()
100 gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
101 gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
105 * \brief bandwidth measurement between localhost and \e peer
107 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
108 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
109 * \arg msg_size: Size of each message sent.
110 * \arg msg_amount: Amount of such messages to exchange
111 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
112 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
113 * \arg bw: observed Bandwidth (in byte/s)
115 * Conduct a bandwidth test from the local process to the given peer.
116 * This call is blocking until the end of the experiment.
118 * If the asked experiment lasts less than \a min_duration, another one will be
119 * launched (and others, if needed). msg_size will be multiplicated by
120 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
121 * reach the \a min_duration). In that case, the reported bandwidth and
122 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
123 * because we need to malloc a block of this size in RL to conduct the
124 * experiment, and we still don't want to visit the swap. In such case, the
125 * number of messages is increased instead of their size.
127 * Results are reported in last args, and sizes are in byte.
129 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
130 * as the total amount of data to send and the msg_size. This
131 * was changed for the fool wanting to send more than MAXINT
132 * bytes in a fat pipe.
135 void amok_bw_test(gras_socket_t peer,
136 unsigned long int buf_size,
137 unsigned long int msg_size,
138 unsigned long int msg_amount,
139 double min_duration, /*OUT*/ double *sec, double *bw)
142 /* Measurement sockets for the experiments */
143 gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
145 bw_request_t request, request_ack;
149 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
151 measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
155 if (port == 10000 - 1) {
156 RETHROW0("Error caught while opening a measurement socket: %s");
163 request = xbt_new0(s_bw_request_t, 1);
164 request->buf_size = buf_size;
165 request->msg_size = msg_size;
166 request->msg_amount = msg_amount;
167 request->peer.name = NULL;
168 request->peer.port = gras_socket_my_port(measMasterIn);
170 ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
171 gras_socket_peer_name(peer), gras_socket_peer_port(peer),
172 request->peer.port, request->buf_size, request->msg_size,
173 request->msg_amount);
176 gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
179 RETHROW0("Error encountered while sending the BW request: %s");
181 measIn = gras_socket_meas_accept(measMasterIn);
184 measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
185 request_ack->peer.port,
186 request->buf_size, 1);
190 ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
191 gras_socket_peer_name(peer), request_ack->peer.port);
193 DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
194 request->msg_size, request->msg_amount);
199 if (first_pass == 0) {
200 double meas_duration = *sec;
203 increase = (min_duration / meas_duration) * 1.1;
207 /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
211 request->msg_size = request->msg_size * increase;
213 /* Do not do too large experiments messages or the sensors
214 will start to swap to store one of them.
215 And then increase the number of messages to compensate (check for overflow there, too) */
216 if (request->msg_size > 64 * 1024 * 1024) {
217 unsigned long int new_amount =
218 ((request->msg_size / ((double) 64 * 1024 * 1024))
219 * request->msg_amount) + 1;
221 xbt_assert0(new_amount > request->msg_amount,
222 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
223 request->msg_amount = new_amount;
225 request->msg_size = 64 * 1024 * 1024;
229 ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
230 meas_duration, min_duration, request->msg_size, request->msg_amount,
231 ((double) request->msg_size) * ((double) request->msg_amount /
232 (*sec) / 1024.0 / 1024.0));
234 gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
238 *sec = gras_os_time();
240 gras_socket_meas_send(measOut, 120, request->msg_size,
241 request->msg_amount);
242 DEBUG0("Data sent. Wait ACK");
243 gras_socket_meas_recv(measIn, 120, 1, 1);
245 gras_socket_close(measOut);
246 gras_socket_close(measMasterIn);
247 gras_socket_close(measIn);
248 RETHROW0("Unable to conduct the experiment: %s");
250 *sec = gras_os_time() - *sec;
253 ((double) request->msg_size) * ((double) request->msg_amount) /
256 DEBUG1("Experiment done ; it took %f sec", *sec);
258 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
261 } while (*sec < min_duration);
263 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
265 gras_msg_send(peer, "BW stop", NULL);
269 if (measIn != measMasterIn)
270 gras_socket_close(measIn);
271 gras_socket_close(measMasterIn);
272 gras_socket_close(measOut);
276 /* Callback to the "BW handshake" message:
277 opens a server measurement socket,
278 indicate its port in an "BW handshaked" message,
279 receive the corresponding data on the measurement socket,
280 close the measurement socket
284 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
286 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
287 gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
288 bw_request_t request = *(bw_request_t *) payload;
293 gras_msg_cb_ctx_t ctx_reask;
294 static xbt_dynar_t msgtwaited = NULL;
297 ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
298 gras_socket_peer_name(expeditor), request->peer.port, request->buf_size,
299 request->msg_size, request->msg_amount);
301 /* Build our answer */
302 answer = xbt_new0(s_bw_request_t, 1);
304 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
306 measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
313 /* FIXME: tell error to remote */
315 ("Error encountered while opening a measurement server socket: %s");
319 answer->buf_size = request->buf_size;
320 answer->msg_size = request->msg_size;
321 answer->msg_amount = request->msg_amount;
322 answer->peer.port = gras_socket_my_port(measMasterIn);
325 gras_msg_rpcreturn(60, ctx, &answer);
328 gras_socket_close(measMasterIn);
329 /* FIXME: tell error to remote */
330 RETHROW0("Error encountered while sending the answer: %s");
334 /* Don't connect asap to leave time to other side to enter the accept() */
336 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
338 request->buf_size, 1);
342 ("Error encountered while opening a measurement socket back to %s:%d : %s",
343 gras_socket_peer_name(expeditor), request->peer.port);
344 /* FIXME: tell error to remote */
348 measIn = gras_socket_meas_accept(measMasterIn);
350 ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
351 answer->buf_size, answer->msg_size, answer->msg_amount,
355 gras_socket_close(measMasterIn);
356 gras_socket_close(measIn);
357 gras_socket_close(measOut);
358 /* FIXME: tell error to remote ? */
359 RETHROW0("Error encountered while opening the meas socket: %s");
363 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
364 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
365 xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
372 gras_socket_meas_recv(measIn, 120, request->msg_size,
373 request->msg_amount);
374 gras_socket_meas_send(measOut, 120, 1, 1);
376 gras_socket_close(measMasterIn);
377 gras_socket_close(measIn);
378 gras_socket_close(measOut);
379 /* FIXME: tell error to remote ? */
380 RETHROW0("Error encountered while receiving the experiment: %s");
382 gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
384 case 0: /* BW stop */
387 case 1: /* BW reask */
390 request = (bw_request_t) payload;
391 VERB0("Return the reasking RPC");
392 gras_msg_rpcreturn(60, ctx_reask, NULL);
394 gras_msg_cb_ctx_free(ctx_reask);
397 if (measIn != measMasterIn)
398 gras_socket_close(measMasterIn);
399 gras_socket_close(measIn);
400 gras_socket_close(measOut);
403 VERB0("BW experiment done.");
408 * \brief request a bandwidth measurement between two remote peers
410 * \arg from_name: Name of the first peer
411 * \arg from_port: port on which the first process is listening for messages
412 * \arg to_name: Name of the second peer
413 * \arg to_port: port on which the second process is listening (for messages, do not
414 * give a measurement socket here. The needed measurement sockets will be created
415 * automatically and negociated between the peers)
416 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
417 * \arg msg_size: Size of each message sent.
418 * \arg msg_amount: Amount of such data to exchange
419 * \arg sec: where the result (in seconds) should be stored.
420 * \arg bw: observed Bandwidth (in byte/s)
422 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
423 * This call is blocking until the end of the experiment.
425 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
426 * as the total amount of data to send and the msg_size. This
427 * was changed for the fool wanting to send more than MAXINT
428 * bytes in a fat pipe.
430 * Results are reported in last args, and sizes are in bytes.
432 void amok_bw_request(const char *from_name, unsigned int from_port,
433 const char *to_name, unsigned int to_port,
434 unsigned long int buf_size,
435 unsigned long int msg_size,
436 unsigned long int msg_amount,
437 double min_duration, /*OUT*/ double *sec, double *bw)
442 bw_request_t request;
444 request = xbt_new0(s_bw_request_t, 1);
445 request->buf_size = buf_size;
446 request->msg_size = msg_size;
447 request->msg_amount = msg_amount;
448 request->min_duration = min_duration;
451 request->peer.name = (char *) to_name;
452 request->peer.port = to_port;
455 sock = gras_socket_client(from_name, from_port);
459 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
461 gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
468 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
469 from_name, from_port, to_name, to_port,
470 result->sec, ((double) result->bw) / 1024.0);
472 gras_socket_close(sock);
477 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
480 /* specification of the test to run, and our answer */
481 bw_request_t request = *(bw_request_t *) payload;
482 bw_res_t result = xbt_new0(s_bw_res_t, 1);
483 gras_socket_t peer, asker;
485 asker = gras_msg_cb_ctx_from(ctx);
486 VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
487 gras_socket_peer_name(asker), gras_socket_peer_port(asker),
488 request->peer.name, request->peer.port,
489 request->msg_size, request->msg_amount);
490 peer = gras_socket_client(request->peer.name, request->peer.port);
492 request->buf_size, request->msg_size, request->msg_amount,
493 request->min_duration, &(result->sec), &(result->bw));
495 gras_msg_rpcreturn(240, ctx, &result);
498 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
499 free(request->peer.name);
506 /** \brief builds a matrix of results of bandwidth measurement
508 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
509 * as the total amount of data to send and the msg_size. This
510 * was changed for the fool wanting to send more than MAXINT
511 * bytes in a fat pipe.
513 double *amok_bw_matrix(xbt_dynar_t peers,
514 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
518 /* construction of matrices for bandwith and latency */
522 int len = xbt_dynar_length(peers);
524 double *matrix_res = xbt_new0(double, len * len);
527 xbt_dynar_foreach(peers, i, p1) {
528 xbt_dynar_foreach(peers, j, p2) {
530 /* Mesurements of Bandwidth */
531 amok_bw_request(p1->name, p1->port, p2->name, p2->port,
532 buf_size_bw, msg_size_bw, msg_amount_bw, min_duration,
533 &sec, &matrix_res[i * len + j]);