3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"msg_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register("BW request", &amok_bw_cb_bw_request);
90 gras_cb_register("BW handshake",&amok_bw_cb_bw_handshake);
92 void amok_bw_bw_leave() {
93 gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
94 gras_cb_unregister("BW handshake",&amok_bw_cb_bw_handshake);
98 * \brief bandwidth measurement between localhost and \e peer
100 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
101 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
102 * \arg msg_size: Size of each message sent.
103 * \arg msg_amount: Amount of such messages to exchange
104 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
105 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
106 * \arg bw: observed Bandwidth (in byte/s)
108 * Conduct a bandwidth test from the local process to the given peer.
109 * This call is blocking until the end of the experiment.
111 * If the asked experiment lasts less than \a min_duration, another one will be
112 * launched (and others, if needed). msg_size will be multiplicated by
113 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
114 * reach the \a min_duration). In that case, the reported bandwidth and
115 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
116 * because we need to malloc a block of this size in RL to conduct the
117 * experiment, and we still don't want to visit the swap. In such case, the
118 * number of messages is increased instead of their size.
120 * Results are reported in last args, and sizes are in byte.
122 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
123 * as the total amount of data to send and the msg_size. This
124 * was changed for the fool wanting to send more than MAXINT
125 * bytes in a fat pipe.
128 void amok_bw_test(gras_socket_t peer,
129 unsigned long int buf_size,
130 unsigned long int msg_size,
131 unsigned long int msg_amount,
133 /*OUT*/ double *sec, double *bw) {
135 /* Measurement sockets for the experiments */
136 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
138 bw_request_t request,request_ack;
142 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
144 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
147 if (port == 10000 -1) {
148 RETHROW0("Error caught while opening a measurement socket: %s");
155 request=xbt_new0(s_bw_request_t,1);
156 request->buf_size=buf_size;
157 request->msg_size=msg_size;
158 request->msg_amount=msg_amount;
159 request->peer.name = NULL;
160 request->peer.port = gras_socket_my_port(measMasterIn);
161 DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
162 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
163 request->buf_size,request->msg_size,request->msg_amount);
166 gras_msg_rpccall(peer,15,
167 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
169 RETHROW0("Error encountered while sending the BW request: %s");
171 measIn = gras_socket_meas_accept(measMasterIn);
174 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
175 request_ack->peer.port,
176 request->buf_size,1);
178 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
179 gras_socket_peer_name(peer),request_ack->peer.port);
181 DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
182 request->msg_size, request->msg_amount);
187 if (first_pass == 0) {
188 double meas_duration=*sec;
191 increase = (min_duration / meas_duration) * 1.1;
195 /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
199 request->msg_size = request->msg_size * increase;
201 /* Do not do too large experiments messages or the sensors
202 will start to swap to store one of them.
203 And then increase the number of messages to compensate (check for overflow there, too) */
204 if (request->msg_size > 64*1024*1024) {
205 unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024))
206 * request->msg_amount ) + 1;
208 xbt_assert0(new_amount > request->msg_amount,
209 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
210 request->msg_amount = new_amount;
212 request->msg_size = 64*1024*1024;
215 VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
216 meas_duration, min_duration,
217 request->msg_size, request->msg_amount,
218 ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
220 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
226 gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
227 DEBUG0("Data sent. Wait ACK");
228 gras_socket_meas_recv(measIn,120,1,1);
230 gras_socket_close(measOut);
231 gras_socket_close(measMasterIn);
232 gras_socket_close(measIn);
233 RETHROW0("Unable to conduct the experiment: %s");
235 *sec = gras_os_time() - *sec;
237 *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
239 DEBUG1("Experiment done ; it took %f sec", *sec);
241 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
244 } while (*sec < min_duration);
246 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
248 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
252 if (measIn != measMasterIn)
253 gras_socket_close(measIn);
254 gras_socket_close(measMasterIn);
255 gras_socket_close(measOut);
259 /* Callback to the "BW handshake" message:
260 opens a server measurement socket,
261 indicate its port in an "BW handshaked" message,
262 receive the corresponding data on the measurement socket,
263 close the measurement socket
267 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
269 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
270 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
271 bw_request_t request=*(bw_request_t*)payload;
276 gras_msg_cb_ctx_t ctx_reask;
277 static xbt_dynar_t msgtwaited=NULL;
279 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
280 gras_socket_peer_name(expeditor),request->peer.port,
281 request->buf_size,request->msg_size, request->msg_amount);
283 /* Build our answer */
284 answer = xbt_new0(s_bw_request_t,1);
286 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
288 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
294 /* FIXME: tell error to remote */
295 RETHROW0("Error encountered while opening a measurement server socket: %s");
299 answer->buf_size=request->buf_size;
300 answer->msg_size=request->msg_size;
301 answer->msg_amount=request->msg_amount;
302 answer->peer.port=gras_socket_my_port(measMasterIn);
305 gras_msg_rpcreturn(60,ctx,&answer);
307 gras_socket_close(measMasterIn);
308 /* FIXME: tell error to remote */
309 RETHROW0("Error encountered while sending the answer: %s");
313 /* Don't connect asap to leave time to other side to enter the accept() */
315 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
317 request->buf_size,1);
319 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
320 gras_socket_peer_name(expeditor),request->peer.port);
321 /* FIXME: tell error to remote */
325 measIn = gras_socket_meas_accept(measMasterIn);
326 DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
327 answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
329 gras_socket_close(measMasterIn);
330 gras_socket_close(measIn);
331 gras_socket_close(measOut);
332 /* FIXME: tell error to remote ? */
333 RETHROW0("Error encountered while opening the meas socket: %s");
337 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
338 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
339 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
346 gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
347 gras_socket_meas_send(measOut,120,1,1);
349 gras_socket_close(measMasterIn);
350 gras_socket_close(measIn);
351 gras_socket_close(measOut);
352 /* FIXME: tell error to remote ? */
353 RETHROW0("Error encountered while receiving the experiment: %s");
355 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
357 case 0: /* BW stop */
360 case 1: /* BW reask */
363 request = (bw_request_t)payload;
364 VERB0("Return the reasking RPC");
365 gras_msg_rpcreturn(60,ctx_reask,NULL);
367 gras_msg_cb_ctx_free(ctx_reask);
370 if (measIn != measMasterIn)
371 gras_socket_close(measMasterIn);
372 gras_socket_close(measIn);
373 gras_socket_close(measOut);
376 VERB0("BW experiment done.");
381 * \brief request a bandwidth measurement between two remote peers
383 * \arg from_name: Name of the first peer
384 * \arg from_port: port on which the first process is listening for messages
385 * \arg to_name: Name of the second peer
386 * \arg to_port: port on which the second process is listening (for messages, do not
387 * give a measurement socket here. The needed measurement sockets will be created
388 * automatically and negociated between the peers)
389 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
390 * \arg msg_size: Size of each message sent.
391 * \arg msg_amount: Amount of such data to exchange
392 * \arg sec: where the result (in seconds) should be stored.
393 * \arg bw: observed Bandwidth (in byte/s)
395 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
396 * This call is blocking until the end of the experiment.
398 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
399 * as the total amount of data to send and the msg_size. This
400 * was changed for the fool wanting to send more than MAXINT
401 * bytes in a fat pipe.
403 * Results are reported in last args, and sizes are in bytes.
405 void amok_bw_request(const char* from_name,unsigned int from_port,
406 const char* to_name,unsigned int to_port,
407 unsigned long int buf_size,
408 unsigned long int msg_size,
409 unsigned long int msg_amount,
411 /*OUT*/ double *sec, double*bw) {
415 bw_request_t request;
417 request=xbt_new0(s_bw_request_t,1);
418 request->buf_size=buf_size;
419 request->msg_size=msg_size;
420 request->msg_amount=msg_amount;
421 request->min_duration = min_duration;
424 request->peer.name = (char*)to_name;
425 request->peer.port = to_port;
428 sock = gras_socket_client(from_name,from_port);
432 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
433 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
440 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
441 from_name,from_port, to_name,to_port,
442 result->sec,((double)result->bw)/1024.0);
444 gras_socket_close(sock);
449 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
452 /* specification of the test to run, and our answer */
453 bw_request_t request = *(bw_request_t*)payload;
454 bw_res_t result = xbt_new0(s_bw_res_t,1);
455 gras_socket_t peer,asker;
457 asker=gras_msg_cb_ctx_from(ctx);
458 VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
459 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
461 request->peer.name,request->peer.port,
462 request->msg_size,request->msg_amount);
463 peer = gras_socket_client(request->peer.name,request->peer.port);
465 request->buf_size,request->msg_size,request->msg_amount,
466 request->min_duration,
467 &(result->sec),&(result->bw));
469 gras_msg_rpcreturn(240,ctx,&result);
472 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
473 free(request->peer.name);
480 /** \brief builds a matrix of results of bandwidth measurement
482 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
483 * as the total amount of data to send and the msg_size. This
484 * was changed for the fool wanting to send more than MAXINT
485 * bytes in a fat pipe.
487 double * amok_bw_matrix(xbt_dynar_t peers,
488 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
489 double min_duration) {
491 /* construction of matrices for bandwith and latency */
494 int i,j,len=xbt_dynar_length(peers);
496 double *matrix_res = xbt_new0(double, len*len);
499 xbt_dynar_foreach (peers,i,p1) {
500 xbt_dynar_foreach (peers,j,p2) {
502 /* Mesurements of Bandwidth */
503 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
504 buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
505 &sec,&matrix_res[i*len + j]);