3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
30 if (! _amok_bw_initialized) {
38 _amok_bw_initialized++;
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43 if (! _amok_bw_initialized)
49 _amok_bw_initialized--;
52 /* ***************************************************************************
54 * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
58 void amok_bw_bw_init() {
59 gras_datadesc_type_t bw_request_desc, bw_res_desc;
61 /* Build the Bandwidth datatype descriptions */
62 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63 gras_datadesc_struct_append(bw_request_desc,"host",
64 gras_datadesc_by_name("s_xbt_host_t"));
65 gras_datadesc_struct_append(bw_request_desc,"buf_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"exp_size",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"msg_size",
70 gras_datadesc_by_name("unsigned long int"));
71 gras_datadesc_struct_append(bw_request_desc,"min_duration",
72 gras_datadesc_by_name("double"));
73 gras_datadesc_struct_close(bw_request_desc);
74 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
76 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80 gras_datadesc_struct_close(bw_res_desc);
81 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
83 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
85 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86 gras_msgtype_declare("BW stop", NULL);
88 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
90 void amok_bw_bw_join() {
91 gras_cb_register(gras_msgtype_by_name("BW request"),
92 &amok_bw_cb_bw_request);
93 gras_cb_register(gras_msgtype_by_name("BW handshake"),
94 &amok_bw_cb_bw_handshake);
96 void amok_bw_bw_leave() {
97 gras_cb_unregister(gras_msgtype_by_name("BW request"),
98 &amok_bw_cb_bw_request);
99 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100 &amok_bw_cb_bw_handshake);
104 * \brief bandwidth measurement between localhost and \e peer
106 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108 * \arg exp_size: Total size of data sent across the network
109 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111 * \arg sec: where the result (in seconds) should be stored.
112 * \arg bw: observed Bandwidth (in byte/s)
114 * Conduct a bandwidth test from the local process to the given peer.
115 * This call is blocking until the end of the experiment.
117 * Results are reported in last args, and sizes are in byte.
119 void amok_bw_test(gras_socket_t peer,
120 unsigned long int buf_size,
121 unsigned long int exp_size,
122 unsigned long int msg_size,
124 /*OUT*/ double *sec, double *bw) {
126 /* Measurement sockets for the experiments */
127 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
129 bw_request_t request,request_ack;
132 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
134 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
137 if (port == 10000 -1) {
138 RETHROW0("Error caught while opening a measurement socket: %s");
145 request=xbt_new0(s_bw_request_t,1);
146 request->buf_size=buf_size;
147 request->exp_size=exp_size;
148 request->msg_size=msg_size;
149 request->host.name = NULL;
150 request->host.port = gras_socket_my_port(measMasterIn);
151 DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
152 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153 buf_size,request->buf_size);
156 gras_msg_rpccall(peer,15,
157 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
159 RETHROW0("Error encountered while sending the BW request: %s");
161 measIn = gras_socket_meas_accept(measMasterIn);
164 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165 request_ack->host.port,
166 request->buf_size,1);
168 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169 gras_socket_peer_name(peer),request_ack->host.port);
171 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
176 double meas_duration=*sec;
177 request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
178 request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
181 DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
182 meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
183 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
184 DEBUG0("Peer is ready for another round of fun");
189 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
190 DEBUG0("Data sent. Wait ACK");
191 gras_socket_meas_recv(measIn,120,1,1);
193 gras_socket_close(measOut);
194 gras_socket_close(measMasterIn);
195 gras_socket_close(measIn);
196 RETHROW0("Unable to conduct the experiment: %s");
198 DEBUG0("Experiment done");
200 *sec = gras_os_time() - *sec;
201 *bw = ((double)request->exp_size) / *sec;
202 } while (*sec < min_duration);
204 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
206 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
210 if (measIn != measMasterIn)
211 gras_socket_close(measIn);
212 gras_socket_close(measMasterIn);
213 gras_socket_close(measOut);
217 /* Callback to the "BW handshake" message:
218 opens a server measurement socket,
219 indicate its port in an "BW handshaked" message,
220 receive the corresponding data on the measurement socket,
221 close the measurment socket
225 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
227 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
228 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
229 bw_request_t request=*(bw_request_t*)payload;
234 gras_msg_cb_ctx_t ctx_reask;
235 static xbt_dynar_t msgtwaited=NULL;
237 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
238 gras_socket_peer_name(expeditor),request->host.port,
239 request->buf_size,request->exp_size,request->msg_size);
241 /* Build our answer */
242 answer = xbt_new0(s_bw_request_t,1);
244 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
246 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
252 /* FIXME: tell error to remote */
253 RETHROW0("Error encountered while opening a measurement server socket: %s");
257 answer->buf_size=request->buf_size;
258 answer->exp_size=request->exp_size;
259 answer->msg_size=request->msg_size;
260 answer->host.port=gras_socket_my_port(measMasterIn);
263 gras_msg_rpcreturn(60,ctx,&answer);
265 gras_socket_close(measMasterIn);
266 /* FIXME: tell error to remote */
267 RETHROW0("Error encountered while sending the answer: %s");
271 /* Don't connect asap to leave time to other side to enter the accept() */
273 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
275 request->buf_size,1);
277 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
278 gras_socket_peer_name(expeditor),request->host.port);
279 /* FIXME: tell error to remote */
283 measIn = gras_socket_meas_accept(measMasterIn);
284 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
285 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
287 gras_socket_close(measMasterIn);
288 gras_socket_close(measIn);
289 gras_socket_close(measOut);
290 /* FIXME: tell error to remote ? */
291 RETHROW0("Error encountered while opening the meas socket: %s");
295 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
296 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
297 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
304 DEBUG0("Recv / Send the experiment");
305 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
306 gras_socket_meas_send(measOut,120,1,1);
309 gras_socket_close(measMasterIn);
310 gras_socket_close(measIn);
311 gras_socket_close(measOut);
312 /* FIXME: tell error to remote ? */
313 RETHROW0("Error encountered while receiving the experiment: %s");
315 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
317 case 0: /* BW stop */
320 case 1: /* BW reask */
323 request = (bw_request_t)payload;
324 VERB0("Return the reasking RPC");
325 gras_msg_rpcreturn(60,ctx_reask,NULL);
327 gras_msg_cb_ctx_free(ctx_reask);
330 if (measIn != measMasterIn)
331 gras_socket_close(measMasterIn);
332 gras_socket_close(measIn);
333 gras_socket_close(measOut);
336 VERB0("BW experiment done.");
341 * \brief request a bandwidth measurement between two remote hosts
343 * \arg from_name: Name of the first host
344 * \arg from_port: port on which the first process is listening for messages
345 * \arg to_name: Name of the second host
346 * \arg to_port: port on which the second process is listening (for messages, do not
347 * give a measurement socket here. The needed measurement sockets will be created
348 * automatically and negociated between the peers)
349 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
350 * \arg exp_size: Total size of data sent across the network
351 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
352 * \arg sec: where the result (in seconds) should be stored.
353 * \arg bw: observed Bandwidth (in byte/s)
355 * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
356 * This call is blocking until the end of the experiment.
358 * Results are reported in last args, and sizes are in bytes.
360 void amok_bw_request(const char* from_name,unsigned int from_port,
361 const char* to_name,unsigned int to_port,
362 unsigned long int buf_size,
363 unsigned long int exp_size,
364 unsigned long int msg_size,
366 /*OUT*/ double *sec, double*bw) {
370 bw_request_t request;
373 request=xbt_new0(s_bw_request_t,1);
374 request->buf_size=buf_size;
375 request->exp_size=exp_size;
376 request->msg_size=msg_size;
377 request->min_duration = min_duration;
379 request->host.name = (char*)to_name;
380 request->host.port = to_port;
382 sock = gras_socket_client(from_name,from_port);
383 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
385 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
392 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
393 from_name,from_port, to_name,to_port,
394 result->sec,((double)result->bw)/1024.0);
396 gras_socket_close(sock);
401 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
404 /* specification of the test to run, and our answer */
405 bw_request_t request = *(bw_request_t*)payload;
406 bw_res_t result = xbt_new0(s_bw_res_t,1);
407 gras_socket_t peer,asker;
409 asker=gras_msg_cb_ctx_from(ctx);
410 VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
411 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
412 request->host.name,request->host.port);
413 peer = gras_socket_client(request->host.name,request->host.port);
415 request->buf_size,request->exp_size,request->msg_size,
416 request->min_duration,
417 &(result->sec),&(result->bw));
419 gras_msg_rpcreturn(240,ctx,&result);
422 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
423 free(request->host.name);
430 /** \brief builds a matrix of results of bandwidth measurement */
431 double * amok_bw_matrix(xbt_dynar_t hosts,
432 int buf_size_bw, int exp_size_bw, int msg_size_bw,
433 double min_duration) {
435 /* construction of matrices for bandwith and latency */
438 int i,j,len=xbt_dynar_length(hosts);
440 double *matrix_res = xbt_new0(double, len*len);
443 xbt_dynar_foreach (hosts,i,h1) {
444 xbt_dynar_foreach (hosts,j,h2) {
446 /* Mesurements of Bandwidth */
447 amok_bw_request(h1->name,h1->port,h2->name,h2->port,
448 buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
449 &sec,&matrix_res[i*len + j]);