3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"exp_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_size",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register(gras_msgtype_by_name("BW request"),
90 &amok_bw_cb_bw_request);
91 gras_cb_register(gras_msgtype_by_name("BW handshake"),
92 &amok_bw_cb_bw_handshake);
94 void amok_bw_bw_leave() {
95 gras_cb_unregister(gras_msgtype_by_name("BW request"),
96 &amok_bw_cb_bw_request);
97 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98 &amok_bw_cb_bw_handshake);
102 * \brief bandwidth measurement between localhost and \e peer
104 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106 * \arg exp_size: Total size of data sent across the network
107 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110 * \arg bw: observed Bandwidth (in byte/s)
112 * Conduct a bandwidth test from the local process to the given peer.
113 * This call is blocking until the end of the experiment.
115 * If the asked experiment lasts less than \a min_duration, another one will be
116 * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117 * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118 * reach the \a min_duration). In that case, the reported bandwidth and
119 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120 * because we need to malloc a block of this size in RL to conduct the
121 * experiment, and we still don't want to visit the swap.
123 * Results are reported in last args, and sizes are in byte.
125 void amok_bw_test(gras_socket_t peer,
126 unsigned long int buf_size,
127 unsigned long int exp_size,
128 unsigned long int msg_size,
130 /*OUT*/ double *sec, double *bw) {
132 /* Measurement sockets for the experiments */
133 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
135 bw_request_t request,request_ack;
139 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
141 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
144 if (port == 10000 -1) {
145 RETHROW0("Error caught while opening a measurement socket: %s");
152 request=xbt_new0(s_bw_request_t,1);
153 request->buf_size=buf_size;
154 request->exp_size=exp_size;
155 request->msg_size=msg_size;
156 request->peer.name = NULL;
157 request->peer.port = gras_socket_my_port(measMasterIn);
158 DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)",
159 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
160 buf_size,request->buf_size);
163 gras_msg_rpccall(peer,15,
164 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
166 RETHROW0("Error encountered while sending the BW request: %s");
168 measIn = gras_socket_meas_accept(measMasterIn);
171 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
172 request_ack->peer.port,
173 request->buf_size,1);
175 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
176 gras_socket_peer_name(peer),request_ack->peer.port);
178 DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
179 request->exp_size, request->msg_size);
184 if (first_pass == 0) {
185 double meas_duration=*sec;
187 request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
188 request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
190 request->exp_size = request->exp_size * 4;
191 request->msg_size = request->msg_size * 4;
194 if (request->msg_size > 64*1024*1024)
195 request->msg_size = 64*1024*1024;
197 VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
198 meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
199 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
205 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
206 DEBUG0("Data sent. Wait ACK");
207 gras_socket_meas_recv(measIn,120,1,1);
209 gras_socket_close(measOut);
210 gras_socket_close(measMasterIn);
211 gras_socket_close(measIn);
212 RETHROW0("Unable to conduct the experiment: %s");
214 *sec = gras_os_time() - *sec;
215 if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
216 DEBUG1("Experiment done ; it took %f sec", *sec);
218 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
221 } while (*sec < min_duration);
223 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
225 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
229 if (measIn != measMasterIn)
230 gras_socket_close(measIn);
231 gras_socket_close(measMasterIn);
232 gras_socket_close(measOut);
236 /* Callback to the "BW handshake" message:
237 opens a server measurement socket,
238 indicate its port in an "BW handshaked" message,
239 receive the corresponding data on the measurement socket,
240 close the measurment socket
244 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
246 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
247 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
248 bw_request_t request=*(bw_request_t*)payload;
253 gras_msg_cb_ctx_t ctx_reask;
254 static xbt_dynar_t msgtwaited=NULL;
256 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
257 gras_socket_peer_name(expeditor),request->peer.port,
258 request->buf_size,request->exp_size,request->msg_size);
260 /* Build our answer */
261 answer = xbt_new0(s_bw_request_t,1);
263 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
265 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
271 /* FIXME: tell error to remote */
272 RETHROW0("Error encountered while opening a measurement server socket: %s");
276 answer->buf_size=request->buf_size;
277 answer->exp_size=request->exp_size;
278 answer->msg_size=request->msg_size;
279 answer->peer.port=gras_socket_my_port(measMasterIn);
282 gras_msg_rpcreturn(60,ctx,&answer);
284 gras_socket_close(measMasterIn);
285 /* FIXME: tell error to remote */
286 RETHROW0("Error encountered while sending the answer: %s");
290 /* Don't connect asap to leave time to other side to enter the accept() */
292 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
294 request->buf_size,1);
296 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
297 gras_socket_peer_name(expeditor),request->peer.port);
298 /* FIXME: tell error to remote */
302 measIn = gras_socket_meas_accept(measMasterIn);
303 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
304 answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
306 gras_socket_close(measMasterIn);
307 gras_socket_close(measIn);
308 gras_socket_close(measOut);
309 /* FIXME: tell error to remote ? */
310 RETHROW0("Error encountered while opening the meas socket: %s");
314 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
315 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
316 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
323 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
324 gras_socket_meas_send(measOut,120,1,1);
326 gras_socket_close(measMasterIn);
327 gras_socket_close(measIn);
328 gras_socket_close(measOut);
329 /* FIXME: tell error to remote ? */
330 RETHROW0("Error encountered while receiving the experiment: %s");
332 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
334 case 0: /* BW stop */
337 case 1: /* BW reask */
340 request = (bw_request_t)payload;
341 VERB0("Return the reasking RPC");
342 gras_msg_rpcreturn(60,ctx_reask,NULL);
344 gras_msg_cb_ctx_free(ctx_reask);
347 if (measIn != measMasterIn)
348 gras_socket_close(measMasterIn);
349 gras_socket_close(measIn);
350 gras_socket_close(measOut);
353 VERB0("BW experiment done.");
358 * \brief request a bandwidth measurement between two remote peers
360 * \arg from_name: Name of the first peer
361 * \arg from_port: port on which the first process is listening for messages
362 * \arg to_name: Name of the second peer
363 * \arg to_port: port on which the second process is listening (for messages, do not
364 * give a measurement socket here. The needed measurement sockets will be created
365 * automatically and negociated between the peers)
366 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
367 * \arg exp_size: Total size of data sent across the network
368 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
369 * \arg sec: where the result (in seconds) should be stored.
370 * \arg bw: observed Bandwidth (in byte/s)
372 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
373 * This call is blocking until the end of the experiment.
375 * Results are reported in last args, and sizes are in bytes.
377 void amok_bw_request(const char* from_name,unsigned int from_port,
378 const char* to_name,unsigned int to_port,
379 unsigned long int buf_size,
380 unsigned long int exp_size,
381 unsigned long int msg_size,
383 /*OUT*/ double *sec, double*bw) {
387 bw_request_t request;
389 request=xbt_new0(s_bw_request_t,1);
390 request->buf_size=buf_size;
391 request->exp_size=exp_size;
392 request->msg_size=msg_size;
393 request->min_duration = min_duration;
396 request->peer.name = (char*)to_name;
397 request->peer.port = to_port;
400 sock = gras_socket_client(from_name,from_port);
404 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
405 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
412 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
413 from_name,from_port, to_name,to_port,
414 result->sec,((double)result->bw)/1024.0);
416 gras_socket_close(sock);
421 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
424 /* specification of the test to run, and our answer */
425 bw_request_t request = *(bw_request_t*)payload;
426 bw_res_t result = xbt_new0(s_bw_res_t,1);
427 gras_socket_t peer,asker;
429 asker=gras_msg_cb_ctx_from(ctx);
430 VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
431 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
433 request->peer.name,request->peer.port);
434 peer = gras_socket_client(request->peer.name,request->peer.port);
436 request->buf_size,request->exp_size,request->msg_size,
437 request->min_duration,
438 &(result->sec),&(result->bw));
440 gras_msg_rpcreturn(240,ctx,&result);
443 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
444 free(request->peer.name);
451 /** \brief builds a matrix of results of bandwidth measurement */
452 double * amok_bw_matrix(xbt_dynar_t peers,
453 int buf_size_bw, int exp_size_bw, int msg_size_bw,
454 double min_duration) {
456 /* construction of matrices for bandwith and latency */
459 int i,j,len=xbt_dynar_length(peers);
461 double *matrix_res = xbt_new0(double, len*len);
464 xbt_dynar_foreach (peers,i,p1) {
465 xbt_dynar_foreach (peers,j,p2) {
467 /* Mesurements of Bandwidth */
468 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
469 buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
470 &sec,&matrix_res[i*len + j]);