3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
16 static short _amok_bw_initialized = 0;
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
24 if (! _amok_bw_initialized) {
26 /* Build the datatype descriptions */
27 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28 gras_datadesc_struct_append(bw_request_desc,"host",
29 gras_datadesc_by_name("xbt_host_t"));
30 gras_datadesc_struct_append(bw_request_desc,"buf_size",
31 gras_datadesc_by_name("unsigned long int"));
32 gras_datadesc_struct_append(bw_request_desc,"exp_size",
33 gras_datadesc_by_name("unsigned long int"));
34 gras_datadesc_struct_append(bw_request_desc,"msg_size",
35 gras_datadesc_by_name("unsigned long int"));
36 gras_datadesc_struct_close(bw_request_desc);
37 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
39 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
41 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
42 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
44 gras_datadesc_struct_close(bw_res_desc);
45 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
47 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
48 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
49 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
51 gras_datadesc_struct_close(sat_request_desc);
52 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
54 /* Register the bandwidth messages */
55 gras_msgtype_declare("BW handshake", bw_request_desc);
56 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
57 gras_msgtype_declare("BW request", bw_request_desc);
58 gras_msgtype_declare("BW result", bw_res_desc);
60 /* Register the saturation messages */
61 gras_msgtype_declare("SAT start", sat_request_desc);
62 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
63 gras_msgtype_declare("SAT begin", sat_request_desc);
64 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
65 gras_msgtype_declare("SAT end", NULL);
66 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
67 gras_msgtype_declare("SAT stop", NULL);
68 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
71 /* Register the callbacks */
72 gras_cb_register(gras_msgtype_by_name("BW request"),
73 &amok_bw_cb_bw_request);
74 gras_cb_register(gras_msgtype_by_name("BW handshake"),
75 &amok_bw_cb_bw_handshake);
77 gras_cb_register(gras_msgtype_by_name("SAT start"),
78 &amok_bw_cb_sat_start);
79 gras_cb_register(gras_msgtype_by_name("SAT begin"),
80 &amok_bw_cb_sat_begin);
82 _amok_bw_initialized =1;
85 /** @brief module finalization */
86 void amok_bw_exit(void) {
87 if (! _amok_bw_initialized)
90 gras_cb_unregister(gras_msgtype_by_name("BW request"),
91 &amok_bw_cb_bw_request);
92 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
93 &amok_bw_cb_bw_handshake);
95 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
96 &amok_bw_cb_sat_start);
97 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
98 &amok_bw_cb_sat_begin);
100 _amok_bw_initialized = 0;
103 /* ***************************************************************************
105 * ***************************************************************************/
108 * \brief bandwidth measurement between localhost and \e peer
110 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111 * \arg buf_size: Size of the socket buffer
112 * \arg exp_size: Total size of data sent across the network
113 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
114 * \arg sec: where the result (in seconds) should be stored.
115 * \arg bw: observed Bandwidth (in kb/s)
117 * Conduct a bandwidth test from the local process to the given peer.
118 * This call is blocking until the end of the experiment.
120 * Results are reported in last args, and sizes are in kb.
122 void amok_bw_test(gras_socket_t peer,
123 unsigned long int buf_size,
124 unsigned long int exp_size,
125 unsigned long int msg_size,
126 /*OUT*/ double *sec, double *bw) {
128 /* Measurement sockets for the experiments */
129 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
131 bw_request_t request,request_ack;
134 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
136 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
142 RETHROW0("Error caught while opening a measurement socket: %s");
147 request=xbt_new0(s_bw_request_t,1);
148 request->buf_size=buf_size*1024;
149 request->exp_size=exp_size*1024;
150 request->msg_size=msg_size*1024;
151 request->host.name = NULL;
152 request->host.port = gras_socket_my_port(measMasterIn);
153 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
154 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
155 buf_size,request->buf_size);
158 gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
160 RETHROW0("Error encountered while sending the BW request: %s");
162 measIn = gras_socket_meas_accept(measMasterIn);
165 gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
167 RETHROW0("Error encountered while waiting for the answer to BW request: %s");
170 /* FIXME: What if there is a remote error? */
173 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174 request_ack->host.port,
175 request->buf_size,1);
177 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178 gras_socket_peer_name(peer),request_ack->host.port);
180 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
184 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
185 gras_socket_meas_recv(measIn,120,1,1);
187 gras_socket_close(measOut);
188 gras_socket_close(measMasterIn);
189 gras_socket_close(measIn);
190 RETHROW0("Unable to conduct the experiment: %s");
193 *sec = gras_os_time() - *sec;
194 *bw = ((double)exp_size) / *sec;
198 if (measIn != measMasterIn)
199 gras_socket_close(measIn);
200 gras_socket_close(measMasterIn);
201 gras_socket_close(measOut);
205 /* Callback to the "BW handshake" message:
206 opens a server measurement socket,
207 indicate its port in an "BW handshaked" message,
208 receive the corresponding data on the measurement socket,
209 close the measurment socket
211 sizes are in byte (got converted from kb my expeditor)
213 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
215 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
216 bw_request_t request=*(bw_request_t*)payload;
221 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
222 gras_socket_peer_name(expeditor),request->host.port,
223 request->buf_size,request->exp_size,request->msg_size);
225 /* Build our answer */
226 answer = xbt_new0(s_bw_request_t,1);
228 for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
230 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
236 /* FIXME: tell error to remote */
237 RETHROW0("Error encountered while opening a measurement server socket: %s");
241 answer->buf_size=request->buf_size;
242 answer->exp_size=request->exp_size;
243 answer->msg_size=request->msg_size;
244 answer->host.port=gras_socket_my_port(measMasterIn);
246 /* Don't connect asap to leave time to other side to enter the accept() */
248 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
250 request->buf_size,1);
252 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
253 gras_socket_peer_name(expeditor),request->host.port);
254 /* FIXME: tell error to remote */
258 gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
260 gras_socket_close(measMasterIn);
261 gras_socket_close(measOut);
262 /* FIXME: tell error to remote */
263 RETHROW0("Error encountered while sending the answer: %s");
267 measIn = gras_socket_meas_accept(measMasterIn);
268 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
269 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
271 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
272 gras_socket_meas_send(measOut,120,1,1);
274 gras_socket_close(measMasterIn);
275 gras_socket_close(measIn);
276 gras_socket_close(measOut);
277 /* FIXME: tell error to remote ? */
278 RETHROW0("Error encountered while receiving the experiment: %s");
281 if (measIn != measMasterIn)
282 gras_socket_close(measMasterIn);
283 gras_socket_close(measIn);
284 gras_socket_close(measOut);
287 DEBUG0("BW experiment done.");
292 * \brief request a bandwidth measurement between two remote hosts
294 * \arg from_name: Name of the first host
295 * \arg from_port: port on which the first process is listening for messages
296 * \arg to_name: Name of the second host
297 * \arg to_port: port on which the second process is listening (for messages, do not
298 * give a measurement socket here. The needed measurement sockets will be created
299 * automatically and negociated between the peers)
300 * \arg buf_size: Size of the socket buffer
301 * \arg exp_size: Total size of data sent across the network
302 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
303 * \arg sec: where the result (in seconds) should be stored.
304 * \arg bw: observed Bandwidth (in kb/s)
306 * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
307 * This call is blocking until the end of the experiment.
309 * Results are reported in last args, and sizes are in kb.
311 void amok_bw_request(const char* from_name,unsigned int from_port,
312 const char* to_name,unsigned int to_port,
313 unsigned long int buf_size,
314 unsigned long int exp_size,
315 unsigned long int msg_size,
316 /*OUT*/ double *sec, double*bw) {
320 bw_request_t request;
323 request=xbt_new0(s_bw_request_t,1);
324 request->buf_size=buf_size;
325 request->exp_size=exp_size;
326 request->msg_size=msg_size;
328 request->host.name = (char*)to_name;
329 request->host.port = to_port;
331 sock = gras_socket_client(from_name,from_port);
332 gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
335 gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
340 VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
341 from_name,from_port, to_name,to_port,
344 gras_socket_close(sock);
348 int amok_bw_cb_bw_request(gras_socket_t expeditor,
351 /* specification of the test to run, and our answer */
352 bw_request_t request = *(bw_request_t*)payload;
353 bw_res_t result = xbt_new0(s_bw_res,1);
356 peer = gras_socket_client(request->host.name,request->host.port);
358 request->buf_size,request->exp_size,request->msg_size,
359 &(result->sec),&(result->bw));
361 gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
364 gras_socket_close(peer);
371 int amok_bw_cb_sat_start(gras_socket_t expeditor,
373 CRITICAL0("amok_bw_cb_sat_start; not implemented");
376 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
378 CRITICAL0("amok_bw_cb_sat_begin: not implemented");