3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"exp_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_size",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register(gras_msgtype_by_name("BW request"),
90 &amok_bw_cb_bw_request);
91 gras_cb_register(gras_msgtype_by_name("BW handshake"),
92 &amok_bw_cb_bw_handshake);
94 void amok_bw_bw_leave() {
95 gras_cb_unregister(gras_msgtype_by_name("BW request"),
96 &amok_bw_cb_bw_request);
97 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98 &amok_bw_cb_bw_handshake);
102 * \brief bandwidth measurement between localhost and \e peer
104 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106 * \arg exp_size: Total size of data sent across the network
107 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110 * \arg bw: observed Bandwidth (in byte/s)
112 * Conduct a bandwidth test from the local process to the given peer.
113 * This call is blocking until the end of the experiment.
115 * If the asked experiment lasts less than \a min_duration, another one will be
116 * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117 * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118 * reach the \a min_duration). In that case, the reported bandwidth and
119 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120 * because we need to malloc a block of this size in RL to conduct the
121 * experiment, and we still don't want to visit the swap.
123 * Results are reported in last args, and sizes are in byte.
125 void amok_bw_test(gras_socket_t peer,
126 unsigned long int buf_size,
127 unsigned long int exp_size,
128 unsigned long int msg_size,
130 /*OUT*/ double *sec, double *bw) {
132 /* Measurement sockets for the experiments */
133 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
135 bw_request_t request,request_ack;
138 int nb_messages = (exp_size % msg_size == 0) ?
139 (exp_size / msg_size) : (exp_size / msg_size + 1);
141 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
143 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
146 if (port == 10000 -1) {
147 RETHROW0("Error caught while opening a measurement socket: %s");
154 request=xbt_new0(s_bw_request_t,1);
155 request->buf_size=buf_size;
156 request->exp_size=msg_size * nb_messages;
157 request->msg_size=msg_size;
158 request->peer.name = NULL;
159 request->peer.port = gras_socket_my_port(measMasterIn);
160 DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)",
161 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
162 buf_size,request->buf_size);
165 gras_msg_rpccall(peer,15,
166 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
168 RETHROW0("Error encountered while sending the BW request: %s");
170 measIn = gras_socket_meas_accept(measMasterIn);
173 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174 request_ack->peer.port,
175 request->buf_size,1);
177 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178 gras_socket_peer_name(peer),request_ack->peer.port);
180 DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
181 request->exp_size, request->msg_size);
186 if (first_pass == 0) {
187 double meas_duration=*sec;
190 increase = (min_duration / meas_duration) * 1.1;
194 /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
198 request->msg_size = request->msg_size * increase;
200 /* Do not do too large experiments messages or the sensors
201 will start to swap to store one of them.
202 And then increase the number of messages to compensate */
203 if (request->msg_size > 64*1024*1024) {
204 nb_messages = ( (request->msg_size / ((double)64*1024*1024))
206 request->msg_size = 64*1024*1024;
209 VERB6("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (nb_messages=%d) (got %fkb/s)",
210 meas_duration, min_duration,
211 request->exp_size, request->msg_size, nb_messages,
212 ((double)request->exp_size) / *sec/1024);
214 xbt_assert0(request->exp_size > request->msg_size * nb_messages,
215 "Overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
216 request->exp_size = request->msg_size * nb_messages;
219 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
225 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
226 DEBUG0("Data sent. Wait ACK");
227 gras_socket_meas_recv(measIn,120,1,1);
229 gras_socket_close(measOut);
230 gras_socket_close(measMasterIn);
231 gras_socket_close(measIn);
232 RETHROW0("Unable to conduct the experiment: %s");
234 *sec = gras_os_time() - *sec;
235 if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
236 DEBUG1("Experiment done ; it took %f sec", *sec);
238 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
241 } while (*sec < min_duration);
243 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
245 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
249 if (measIn != measMasterIn)
250 gras_socket_close(measIn);
251 gras_socket_close(measMasterIn);
252 gras_socket_close(measOut);
256 /* Callback to the "BW handshake" message:
257 opens a server measurement socket,
258 indicate its port in an "BW handshaked" message,
259 receive the corresponding data on the measurement socket,
260 close the measurment socket
264 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
266 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
267 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
268 bw_request_t request=*(bw_request_t*)payload;
273 gras_msg_cb_ctx_t ctx_reask;
274 static xbt_dynar_t msgtwaited=NULL;
276 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
277 gras_socket_peer_name(expeditor),request->peer.port,
278 request->buf_size,request->exp_size,request->msg_size);
280 /* Build our answer */
281 answer = xbt_new0(s_bw_request_t,1);
283 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
285 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
291 /* FIXME: tell error to remote */
292 RETHROW0("Error encountered while opening a measurement server socket: %s");
296 answer->buf_size=request->buf_size;
297 answer->exp_size=request->exp_size;
298 answer->msg_size=request->msg_size;
299 answer->peer.port=gras_socket_my_port(measMasterIn);
302 gras_msg_rpcreturn(60,ctx,&answer);
304 gras_socket_close(measMasterIn);
305 /* FIXME: tell error to remote */
306 RETHROW0("Error encountered while sending the answer: %s");
310 /* Don't connect asap to leave time to other side to enter the accept() */
312 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
314 request->buf_size,1);
316 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
317 gras_socket_peer_name(expeditor),request->peer.port);
318 /* FIXME: tell error to remote */
322 measIn = gras_socket_meas_accept(measMasterIn);
323 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
324 answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
326 gras_socket_close(measMasterIn);
327 gras_socket_close(measIn);
328 gras_socket_close(measOut);
329 /* FIXME: tell error to remote ? */
330 RETHROW0("Error encountered while opening the meas socket: %s");
334 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
335 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
336 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
343 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
344 gras_socket_meas_send(measOut,120,1,1);
346 gras_socket_close(measMasterIn);
347 gras_socket_close(measIn);
348 gras_socket_close(measOut);
349 /* FIXME: tell error to remote ? */
350 RETHROW0("Error encountered while receiving the experiment: %s");
352 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
354 case 0: /* BW stop */
357 case 1: /* BW reask */
360 request = (bw_request_t)payload;
361 VERB0("Return the reasking RPC");
362 gras_msg_rpcreturn(60,ctx_reask,NULL);
364 gras_msg_cb_ctx_free(ctx_reask);
367 if (measIn != measMasterIn)
368 gras_socket_close(measMasterIn);
369 gras_socket_close(measIn);
370 gras_socket_close(measOut);
373 VERB0("BW experiment done.");
378 * \brief request a bandwidth measurement between two remote peers
380 * \arg from_name: Name of the first peer
381 * \arg from_port: port on which the first process is listening for messages
382 * \arg to_name: Name of the second peer
383 * \arg to_port: port on which the second process is listening (for messages, do not
384 * give a measurement socket here. The needed measurement sockets will be created
385 * automatically and negociated between the peers)
386 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
387 * \arg exp_size: Total size of data sent across the network
388 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
389 * \arg sec: where the result (in seconds) should be stored.
390 * \arg bw: observed Bandwidth (in byte/s)
392 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
393 * This call is blocking until the end of the experiment.
395 * Results are reported in last args, and sizes are in bytes.
397 void amok_bw_request(const char* from_name,unsigned int from_port,
398 const char* to_name,unsigned int to_port,
399 unsigned long int buf_size,
400 unsigned long int exp_size,
401 unsigned long int msg_size,
403 /*OUT*/ double *sec, double*bw) {
407 bw_request_t request;
409 request=xbt_new0(s_bw_request_t,1);
410 request->buf_size=buf_size;
411 request->exp_size=exp_size;
412 request->msg_size=msg_size;
413 request->min_duration = min_duration;
416 request->peer.name = (char*)to_name;
417 request->peer.port = to_port;
420 sock = gras_socket_client(from_name,from_port);
424 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
425 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
432 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
433 from_name,from_port, to_name,to_port,
434 result->sec,((double)result->bw)/1024.0);
436 gras_socket_close(sock);
441 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
444 /* specification of the test to run, and our answer */
445 bw_request_t request = *(bw_request_t*)payload;
446 bw_res_t result = xbt_new0(s_bw_res_t,1);
447 gras_socket_t peer,asker;
449 asker=gras_msg_cb_ctx_from(ctx);
450 VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
451 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
453 request->peer.name,request->peer.port);
454 peer = gras_socket_client(request->peer.name,request->peer.port);
456 request->buf_size,request->exp_size,request->msg_size,
457 request->min_duration,
458 &(result->sec),&(result->bw));
460 gras_msg_rpcreturn(240,ctx,&result);
463 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
464 free(request->peer.name);
471 /** \brief builds a matrix of results of bandwidth measurement */
472 double * amok_bw_matrix(xbt_dynar_t peers,
473 int buf_size_bw, int exp_size_bw, int msg_size_bw,
474 double min_duration) {
476 /* construction of matrices for bandwith and latency */
479 int i,j,len=xbt_dynar_length(peers);
481 double *matrix_res = xbt_new0(double, len*len);
484 xbt_dynar_foreach (peers,i,p1) {
485 xbt_dynar_foreach (peers,j,p2) {
487 /* Mesurements of Bandwidth */
488 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
489 buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
490 &sec,&matrix_res[i*len + j]);