3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"msg_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register(gras_msgtype_by_name("BW request"),
90 &amok_bw_cb_bw_request);
91 gras_cb_register(gras_msgtype_by_name("BW handshake"),
92 &amok_bw_cb_bw_handshake);
94 void amok_bw_bw_leave() {
95 gras_cb_unregister(gras_msgtype_by_name("BW request"),
96 &amok_bw_cb_bw_request);
97 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98 &amok_bw_cb_bw_handshake);
102 * \brief bandwidth measurement between localhost and \e peer
104 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106 * \arg msg_size: Size of each message sent.
107 * \arg msg_amount: Amount of such messages to exchange
108 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110 * \arg bw: observed Bandwidth (in byte/s)
112 * Conduct a bandwidth test from the local process to the given peer.
113 * This call is blocking until the end of the experiment.
115 * If the asked experiment lasts less than \a min_duration, another one will be
116 * launched (and others, if needed). msg_size will be multiplicated by
117 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
118 * reach the \a min_duration). In that case, the reported bandwidth and
119 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120 * because we need to malloc a block of this size in RL to conduct the
121 * experiment, and we still don't want to visit the swap. In such case, the
122 * number of messages is increased instead of their size.
124 * Results are reported in last args, and sizes are in byte.
126 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
127 * as the total amount of data to send and the msg_size. This
128 * was changed for the fool wanting to send more than MAXINT
129 * bytes in a fat pipe.
132 void amok_bw_test(gras_socket_t peer,
133 unsigned long int buf_size,
134 unsigned long int msg_size,
135 unsigned long int msg_amount,
137 /*OUT*/ double *sec, double *bw) {
139 /* Measurement sockets for the experiments */
140 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
142 bw_request_t request,request_ack;
146 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
148 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
151 if (port == 10000 -1) {
152 RETHROW0("Error caught while opening a measurement socket: %s");
159 request=xbt_new0(s_bw_request_t,1);
160 request->buf_size=buf_size;
161 request->msg_size=msg_size;
162 request->msg_amount=msg_amount;
163 request->peer.name = NULL;
164 request->peer.port = gras_socket_my_port(measMasterIn);
165 DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
166 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
167 request->buf_size,request->msg_size,request->msg_amount);
170 gras_msg_rpccall(peer,15,
171 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
173 RETHROW0("Error encountered while sending the BW request: %s");
175 measIn = gras_socket_meas_accept(measMasterIn);
178 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
179 request_ack->peer.port,
180 request->buf_size,1);
182 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
183 gras_socket_peer_name(peer),request_ack->peer.port);
185 DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
186 request->msg_size, request->msg_amount);
191 if (first_pass == 0) {
192 double meas_duration=*sec;
195 increase = (min_duration / meas_duration) * 1.1;
199 /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
203 request->msg_size = request->msg_size * increase;
205 /* Do not do too large experiments messages or the sensors
206 will start to swap to store one of them.
207 And then increase the number of messages to compensate (check for overflow there, too) */
208 if (request->msg_size > 64*1024*1024) {
209 unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024))
210 * request->msg_amount ) + 1;
212 xbt_assert0(new_amount > request->msg_amount,
213 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
214 request->msg_amount = new_amount;
216 request->msg_size = 64*1024*1024;
219 VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
220 meas_duration, min_duration,
221 request->msg_size, request->msg_amount,
222 ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
224 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
230 gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
231 DEBUG0("Data sent. Wait ACK");
232 gras_socket_meas_recv(measIn,120,1,1);
234 gras_socket_close(measOut);
235 gras_socket_close(measMasterIn);
236 gras_socket_close(measIn);
237 RETHROW0("Unable to conduct the experiment: %s");
239 *sec = gras_os_time() - *sec;
241 *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
243 DEBUG1("Experiment done ; it took %f sec", *sec);
245 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
248 } while (*sec < min_duration);
250 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
252 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
256 if (measIn != measMasterIn)
257 gras_socket_close(measIn);
258 gras_socket_close(measMasterIn);
259 gras_socket_close(measOut);
263 /* Callback to the "BW handshake" message:
264 opens a server measurement socket,
265 indicate its port in an "BW handshaked" message,
266 receive the corresponding data on the measurement socket,
267 close the measurement socket
271 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
273 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
274 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
275 bw_request_t request=*(bw_request_t*)payload;
280 gras_msg_cb_ctx_t ctx_reask;
281 static xbt_dynar_t msgtwaited=NULL;
283 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
284 gras_socket_peer_name(expeditor),request->peer.port,
285 request->buf_size,request->msg_size, request->msg_amount);
287 /* Build our answer */
288 answer = xbt_new0(s_bw_request_t,1);
290 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
292 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
298 /* FIXME: tell error to remote */
299 RETHROW0("Error encountered while opening a measurement server socket: %s");
303 answer->buf_size=request->buf_size;
304 answer->msg_size=request->msg_size;
305 answer->msg_amount=request->msg_amount;
306 answer->peer.port=gras_socket_my_port(measMasterIn);
309 gras_msg_rpcreturn(60,ctx,&answer);
311 gras_socket_close(measMasterIn);
312 /* FIXME: tell error to remote */
313 RETHROW0("Error encountered while sending the answer: %s");
317 /* Don't connect asap to leave time to other side to enter the accept() */
319 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
321 request->buf_size,1);
323 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
324 gras_socket_peer_name(expeditor),request->peer.port);
325 /* FIXME: tell error to remote */
329 measIn = gras_socket_meas_accept(measMasterIn);
330 DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
331 answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
333 gras_socket_close(measMasterIn);
334 gras_socket_close(measIn);
335 gras_socket_close(measOut);
336 /* FIXME: tell error to remote ? */
337 RETHROW0("Error encountered while opening the meas socket: %s");
341 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
342 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
343 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
350 gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
351 gras_socket_meas_send(measOut,120,1,1);
353 gras_socket_close(measMasterIn);
354 gras_socket_close(measIn);
355 gras_socket_close(measOut);
356 /* FIXME: tell error to remote ? */
357 RETHROW0("Error encountered while receiving the experiment: %s");
359 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
361 case 0: /* BW stop */
364 case 1: /* BW reask */
367 request = (bw_request_t)payload;
368 VERB0("Return the reasking RPC");
369 gras_msg_rpcreturn(60,ctx_reask,NULL);
371 gras_msg_cb_ctx_free(ctx_reask);
374 if (measIn != measMasterIn)
375 gras_socket_close(measMasterIn);
376 gras_socket_close(measIn);
377 gras_socket_close(measOut);
380 VERB0("BW experiment done.");
385 * \brief request a bandwidth measurement between two remote peers
387 * \arg from_name: Name of the first peer
388 * \arg from_port: port on which the first process is listening for messages
389 * \arg to_name: Name of the second peer
390 * \arg to_port: port on which the second process is listening (for messages, do not
391 * give a measurement socket here. The needed measurement sockets will be created
392 * automatically and negociated between the peers)
393 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
394 * \arg msg_size: Size of each message sent.
395 * \arg msg_amount: Amount of such data to exchange
396 * \arg sec: where the result (in seconds) should be stored.
397 * \arg bw: observed Bandwidth (in byte/s)
399 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
400 * This call is blocking until the end of the experiment.
402 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
403 * as the total amount of data to send and the msg_size. This
404 * was changed for the fool wanting to send more than MAXINT
405 * bytes in a fat pipe.
407 * Results are reported in last args, and sizes are in bytes.
409 void amok_bw_request(const char* from_name,unsigned int from_port,
410 const char* to_name,unsigned int to_port,
411 unsigned long int buf_size,
412 unsigned long int msg_size,
413 unsigned long int msg_amount,
415 /*OUT*/ double *sec, double*bw) {
419 bw_request_t request;
421 request=xbt_new0(s_bw_request_t,1);
422 request->buf_size=buf_size;
423 request->msg_size=msg_size;
424 request->msg_amount=msg_amount;
425 request->min_duration = min_duration;
428 request->peer.name = (char*)to_name;
429 request->peer.port = to_port;
432 sock = gras_socket_client(from_name,from_port);
436 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
437 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
444 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
445 from_name,from_port, to_name,to_port,
446 result->sec,((double)result->bw)/1024.0);
448 gras_socket_close(sock);
453 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
456 /* specification of the test to run, and our answer */
457 bw_request_t request = *(bw_request_t*)payload;
458 bw_res_t result = xbt_new0(s_bw_res_t,1);
459 gras_socket_t peer,asker;
461 asker=gras_msg_cb_ctx_from(ctx);
462 VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
463 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
465 request->peer.name,request->peer.port,
466 request->msg_size,request->msg_amount);
467 peer = gras_socket_client(request->peer.name,request->peer.port);
469 request->buf_size,request->msg_size,request->msg_amount,
470 request->min_duration,
471 &(result->sec),&(result->bw));
473 gras_msg_rpcreturn(240,ctx,&result);
476 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
477 free(request->peer.name);
484 /** \brief builds a matrix of results of bandwidth measurement
486 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
487 * as the total amount of data to send and the msg_size. This
488 * was changed for the fool wanting to send more than MAXINT
489 * bytes in a fat pipe.
491 double * amok_bw_matrix(xbt_dynar_t peers,
492 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
493 double min_duration) {
495 /* construction of matrices for bandwith and latency */
498 int i,j,len=xbt_dynar_length(peers);
500 double *matrix_res = xbt_new0(double, len*len);
503 xbt_dynar_foreach (peers,i,p1) {
504 xbt_dynar_foreach (peers,j,p2) {
506 /* Mesurements of Bandwidth */
507 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
508 buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
509 &sec,&matrix_res[i*len + j]);