3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"exp_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_size",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register(gras_msgtype_by_name("BW request"),
90 &amok_bw_cb_bw_request);
91 gras_cb_register(gras_msgtype_by_name("BW handshake"),
92 &amok_bw_cb_bw_handshake);
94 void amok_bw_bw_leave() {
95 gras_cb_unregister(gras_msgtype_by_name("BW request"),
96 &amok_bw_cb_bw_request);
97 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98 &amok_bw_cb_bw_handshake);
102 * \brief bandwidth measurement between localhost and \e peer
104 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106 * \arg exp_size: Total size of data sent across the network
107 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110 * \arg bw: observed Bandwidth (in byte/s)
112 * Conduct a bandwidth test from the local process to the given peer.
113 * This call is blocking until the end of the experiment.
115 * If the asked experiment lasts less than \a min_duration, another one will be
116 * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117 * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118 * reach the \a min_duration). In that case, the reported bandwidth and
119 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120 * because we need to malloc a block of this size in RL to conduct the
121 * experiment, and we still don't want to visit the swap.
123 * Results are reported in last args, and sizes are in byte.
125 void amok_bw_test(gras_socket_t peer,
126 unsigned long int buf_size,
127 unsigned long int exp_size,
128 unsigned long int msg_size,
130 /*OUT*/ double *sec, double *bw) {
132 /* Measurement sockets for the experiments */
133 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
135 bw_request_t request,request_ack;
138 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
140 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
143 if (port == 10000 -1) {
144 RETHROW0("Error caught while opening a measurement socket: %s");
151 request=xbt_new0(s_bw_request_t,1);
152 request->buf_size=buf_size;
153 request->exp_size=exp_size;
154 request->msg_size=msg_size;
155 request->peer.name = NULL;
156 request->peer.port = gras_socket_my_port(measMasterIn);
157 DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
158 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
159 buf_size,request->buf_size);
162 gras_msg_rpccall(peer,15,
163 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
165 RETHROW0("Error encountered while sending the BW request: %s");
167 measIn = gras_socket_meas_accept(measMasterIn);
170 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
171 request_ack->peer.port,
172 request->buf_size,1);
174 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
175 gras_socket_peer_name(peer),request_ack->peer.port);
177 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
182 double meas_duration=*sec;
183 request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
184 request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
185 if (request->msg_size > 64*1024*1024)
186 request->msg_size = 64*1024*1024;
188 VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
189 meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
190 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
195 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
196 DEBUG0("Data sent. Wait ACK");
197 gras_socket_meas_recv(measIn,120,1,1);
199 gras_socket_close(measOut);
200 gras_socket_close(measMasterIn);
201 gras_socket_close(measIn);
202 RETHROW0("Unable to conduct the experiment: %s");
204 DEBUG0("Experiment done");
206 *sec = gras_os_time() - *sec;
207 *bw = ((double)request->exp_size) / *sec;
208 } while (*sec < min_duration);
210 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
212 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
216 if (measIn != measMasterIn)
217 gras_socket_close(measIn);
218 gras_socket_close(measMasterIn);
219 gras_socket_close(measOut);
223 /* Callback to the "BW handshake" message:
224 opens a server measurement socket,
225 indicate its port in an "BW handshaked" message,
226 receive the corresponding data on the measurement socket,
227 close the measurment socket
231 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
233 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
234 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
235 bw_request_t request=*(bw_request_t*)payload;
240 gras_msg_cb_ctx_t ctx_reask;
241 static xbt_dynar_t msgtwaited=NULL;
243 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
244 gras_socket_peer_name(expeditor),request->peer.port,
245 request->buf_size,request->exp_size,request->msg_size);
247 /* Build our answer */
248 answer = xbt_new0(s_bw_request_t,1);
250 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
252 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
258 /* FIXME: tell error to remote */
259 RETHROW0("Error encountered while opening a measurement server socket: %s");
263 answer->buf_size=request->buf_size;
264 answer->exp_size=request->exp_size;
265 answer->msg_size=request->msg_size;
266 answer->peer.port=gras_socket_my_port(measMasterIn);
269 gras_msg_rpcreturn(60,ctx,&answer);
271 gras_socket_close(measMasterIn);
272 /* FIXME: tell error to remote */
273 RETHROW0("Error encountered while sending the answer: %s");
277 /* Don't connect asap to leave time to other side to enter the accept() */
279 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
281 request->buf_size,1);
283 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
284 gras_socket_peer_name(expeditor),request->peer.port);
285 /* FIXME: tell error to remote */
289 measIn = gras_socket_meas_accept(measMasterIn);
290 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
291 answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
293 gras_socket_close(measMasterIn);
294 gras_socket_close(measIn);
295 gras_socket_close(measOut);
296 /* FIXME: tell error to remote ? */
297 RETHROW0("Error encountered while opening the meas socket: %s");
301 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
302 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
303 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
310 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
311 gras_socket_meas_send(measOut,120,1,1);
313 gras_socket_close(measMasterIn);
314 gras_socket_close(measIn);
315 gras_socket_close(measOut);
316 /* FIXME: tell error to remote ? */
317 RETHROW0("Error encountered while receiving the experiment: %s");
319 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
321 case 0: /* BW stop */
324 case 1: /* BW reask */
327 request = (bw_request_t)payload;
328 VERB0("Return the reasking RPC");
329 gras_msg_rpcreturn(60,ctx_reask,NULL);
331 gras_msg_cb_ctx_free(ctx_reask);
334 if (measIn != measMasterIn)
335 gras_socket_close(measMasterIn);
336 gras_socket_close(measIn);
337 gras_socket_close(measOut);
340 VERB0("BW experiment done.");
345 * \brief request a bandwidth measurement between two remote peers
347 * \arg from_name: Name of the first peer
348 * \arg from_port: port on which the first process is listening for messages
349 * \arg to_name: Name of the second peer
350 * \arg to_port: port on which the second process is listening (for messages, do not
351 * give a measurement socket here. The needed measurement sockets will be created
352 * automatically and negociated between the peers)
353 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
354 * \arg exp_size: Total size of data sent across the network
355 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
356 * \arg sec: where the result (in seconds) should be stored.
357 * \arg bw: observed Bandwidth (in byte/s)
359 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
360 * This call is blocking until the end of the experiment.
362 * Results are reported in last args, and sizes are in bytes.
364 void amok_bw_request(const char* from_name,unsigned int from_port,
365 const char* to_name,unsigned int to_port,
366 unsigned long int buf_size,
367 unsigned long int exp_size,
368 unsigned long int msg_size,
370 /*OUT*/ double *sec, double*bw) {
374 bw_request_t request;
376 request=xbt_new0(s_bw_request_t,1);
377 request->buf_size=buf_size;
378 request->exp_size=exp_size;
379 request->msg_size=msg_size;
380 request->min_duration = min_duration;
383 request->peer.name = (char*)to_name;
384 request->peer.port = to_port;
387 sock = gras_socket_client(from_name,from_port);
391 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
392 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
399 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
400 from_name,from_port, to_name,to_port,
401 result->sec,((double)result->bw)/1024.0);
403 gras_socket_close(sock);
408 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
411 /* specification of the test to run, and our answer */
412 bw_request_t request = *(bw_request_t*)payload;
413 bw_res_t result = xbt_new0(s_bw_res_t,1);
414 gras_socket_t peer,asker;
416 asker=gras_msg_cb_ctx_from(ctx);
417 VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
418 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
420 request->peer.name,request->peer.port);
421 peer = gras_socket_client(request->peer.name,request->peer.port);
423 request->buf_size,request->exp_size,request->msg_size,
424 request->min_duration,
425 &(result->sec),&(result->bw));
427 gras_msg_rpcreturn(240,ctx,&result);
430 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
431 free(request->peer.name);
438 /** \brief builds a matrix of results of bandwidth measurement */
439 double * amok_bw_matrix(xbt_dynar_t peers,
440 int buf_size_bw, int exp_size_bw, int msg_size_bw,
441 double min_duration) {
443 /* construction of matrices for bandwith and latency */
446 int i,j,len=xbt_dynar_length(peers);
448 double *matrix_res = xbt_new0(double, len*len);
451 xbt_dynar_foreach (peers,i,p1) {
452 xbt_dynar_foreach (peers,j,p2) {
454 /* Mesurements of Bandwidth */
455 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
456 buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
457 &sec,&matrix_res[i*len + j]);