3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
28 if (! _amok_bw_initialized) {
36 _amok_bw_initialized++;
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41 if (! _amok_bw_initialized)
47 _amok_bw_initialized--;
50 /* ***************************************************************************
52 * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
56 void amok_bw_bw_init() {
57 gras_datadesc_type_t bw_request_desc, bw_res_desc;
59 /* Build the Bandwidth datatype descriptions */
60 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61 gras_datadesc_struct_append(bw_request_desc,"peer",
62 gras_datadesc_by_name("s_xbt_peer_t"));
63 gras_datadesc_struct_append(bw_request_desc,"buf_size",
64 gras_datadesc_by_name("unsigned long int"));
65 gras_datadesc_struct_append(bw_request_desc,"msg_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"min_duration",
70 gras_datadesc_by_name("double"));
71 gras_datadesc_struct_close(bw_request_desc);
72 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
74 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78 gras_datadesc_struct_close(bw_res_desc);
79 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
81 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
83 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84 gras_msgtype_declare("BW stop", NULL);
86 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
88 void amok_bw_bw_join() {
89 gras_cb_register("BW request", &amok_bw_cb_bw_request);
90 gras_cb_register("BW handshake",&amok_bw_cb_bw_handshake);
92 void amok_bw_bw_leave() {
93 gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
94 gras_cb_unregister("BW handshake",&amok_bw_cb_bw_handshake);
98 * \brief bandwidth measurement between localhost and \e peer
100 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
101 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
102 * \arg msg_size: Size of each message sent.
103 * \arg msg_amount: Amount of such messages to exchange
104 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
105 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
106 * \arg bw: observed Bandwidth (in byte/s)
108 * Conduct a bandwidth test from the local process to the given peer.
109 * This call is blocking until the end of the experiment.
111 * If the asked experiment lasts less than \a min_duration, another one will be
112 * launched (and others, if needed). msg_size will be multiplicated by
113 * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
114 * reach the \a min_duration). In that case, the reported bandwidth and
115 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
116 * because we need to malloc a block of this size in RL to conduct the
117 * experiment, and we still don't want to visit the swap. In such case, the
118 * number of messages is increased instead of their size.
120 * Results are reported in last args, and sizes are in byte.
122 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
123 * as the total amount of data to send and the msg_size. This
124 * was changed for the fool wanting to send more than MAXINT
125 * bytes in a fat pipe.
128 void amok_bw_test(gras_socket_t peer,
129 unsigned long int buf_size,
130 unsigned long int msg_size,
131 unsigned long int msg_amount,
133 /*OUT*/ double *sec, double *bw) {
135 /* Measurement sockets for the experiments */
136 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
138 bw_request_t request,request_ack;
142 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
144 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
147 if (port == 10000 -1) {
148 RETHROW0("Error caught while opening a measurement socket: %s");
155 request=xbt_new0(s_bw_request_t,1);
156 request->buf_size=buf_size;
157 request->msg_size=msg_size;
158 request->msg_amount=msg_amount;
159 request->peer.name = NULL;
160 request->peer.port = gras_socket_my_port(measMasterIn);
161 DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
162 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
163 request->buf_size,request->msg_size,request->msg_amount);
166 gras_msg_rpccall(peer,15,"BW handshake",&request, &request_ack);
168 RETHROW0("Error encountered while sending the BW request: %s");
170 measIn = gras_socket_meas_accept(measMasterIn);
173 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174 request_ack->peer.port,
175 request->buf_size,1);
177 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178 gras_socket_peer_name(peer),request_ack->peer.port);
180 DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
181 request->msg_size, request->msg_amount);
186 if (first_pass == 0) {
187 double meas_duration=*sec;
190 increase = (min_duration / meas_duration) * 1.1;
194 /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
198 request->msg_size = request->msg_size * increase;
200 /* Do not do too large experiments messages or the sensors
201 will start to swap to store one of them.
202 And then increase the number of messages to compensate (check for overflow there, too) */
203 if (request->msg_size > 64*1024*1024) {
204 unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024))
205 * request->msg_amount ) + 1;
207 xbt_assert0(new_amount > request->msg_amount,
208 "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
209 request->msg_amount = new_amount;
211 request->msg_size = 64*1024*1024;
214 VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
215 meas_duration, min_duration,
216 request->msg_size, request->msg_amount,
217 ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
219 gras_msg_rpccall(peer, 60, "BW reask",&request, NULL);
225 gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
226 DEBUG0("Data sent. Wait ACK");
227 gras_socket_meas_recv(measIn,120,1,1);
229 gras_socket_close(measOut);
230 gras_socket_close(measMasterIn);
231 gras_socket_close(measIn);
232 RETHROW0("Unable to conduct the experiment: %s");
234 *sec = gras_os_time() - *sec;
236 *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
238 DEBUG1("Experiment done ; it took %f sec", *sec);
240 CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
243 } while (*sec < min_duration);
245 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
247 gras_msg_send(peer, "BW stop", NULL);
251 if (measIn != measMasterIn)
252 gras_socket_close(measIn);
253 gras_socket_close(measMasterIn);
254 gras_socket_close(measOut);
258 /* Callback to the "BW handshake" message:
259 opens a server measurement socket,
260 indicate its port in an "BW handshaked" message,
261 receive the corresponding data on the measurement socket,
262 close the measurement socket
266 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
268 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
269 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
270 bw_request_t request=*(bw_request_t*)payload;
275 gras_msg_cb_ctx_t ctx_reask;
276 static xbt_dynar_t msgtwaited=NULL;
278 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
279 gras_socket_peer_name(expeditor),request->peer.port,
280 request->buf_size,request->msg_size, request->msg_amount);
282 /* Build our answer */
283 answer = xbt_new0(s_bw_request_t,1);
285 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
287 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
293 /* FIXME: tell error to remote */
294 RETHROW0("Error encountered while opening a measurement server socket: %s");
298 answer->buf_size=request->buf_size;
299 answer->msg_size=request->msg_size;
300 answer->msg_amount=request->msg_amount;
301 answer->peer.port=gras_socket_my_port(measMasterIn);
304 gras_msg_rpcreturn(60,ctx,&answer);
306 gras_socket_close(measMasterIn);
307 /* FIXME: tell error to remote */
308 RETHROW0("Error encountered while sending the answer: %s");
312 /* Don't connect asap to leave time to other side to enter the accept() */
314 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
316 request->buf_size,1);
318 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
319 gras_socket_peer_name(expeditor),request->peer.port);
320 /* FIXME: tell error to remote */
324 measIn = gras_socket_meas_accept(measMasterIn);
325 DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
326 answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
328 gras_socket_close(measMasterIn);
329 gras_socket_close(measIn);
330 gras_socket_close(measOut);
331 /* FIXME: tell error to remote ? */
332 RETHROW0("Error encountered while opening the meas socket: %s");
336 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
337 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
338 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
345 gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
346 gras_socket_meas_send(measOut,120,1,1);
348 gras_socket_close(measMasterIn);
349 gras_socket_close(measIn);
350 gras_socket_close(measOut);
351 /* FIXME: tell error to remote ? */
352 RETHROW0("Error encountered while receiving the experiment: %s");
354 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
356 case 0: /* BW stop */
359 case 1: /* BW reask */
362 request = (bw_request_t)payload;
363 VERB0("Return the reasking RPC");
364 gras_msg_rpcreturn(60,ctx_reask,NULL);
366 gras_msg_cb_ctx_free(ctx_reask);
369 if (measIn != measMasterIn)
370 gras_socket_close(measMasterIn);
371 gras_socket_close(measIn);
372 gras_socket_close(measOut);
375 VERB0("BW experiment done.");
380 * \brief request a bandwidth measurement between two remote peers
382 * \arg from_name: Name of the first peer
383 * \arg from_port: port on which the first process is listening for messages
384 * \arg to_name: Name of the second peer
385 * \arg to_port: port on which the second process is listening (for messages, do not
386 * give a measurement socket here. The needed measurement sockets will be created
387 * automatically and negociated between the peers)
388 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
389 * \arg msg_size: Size of each message sent.
390 * \arg msg_amount: Amount of such data to exchange
391 * \arg sec: where the result (in seconds) should be stored.
392 * \arg bw: observed Bandwidth (in byte/s)
394 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
395 * This call is blocking until the end of the experiment.
397 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
398 * as the total amount of data to send and the msg_size. This
399 * was changed for the fool wanting to send more than MAXINT
400 * bytes in a fat pipe.
402 * Results are reported in last args, and sizes are in bytes.
404 void amok_bw_request(const char* from_name,unsigned int from_port,
405 const char* to_name,unsigned int to_port,
406 unsigned long int buf_size,
407 unsigned long int msg_size,
408 unsigned long int msg_amount,
410 /*OUT*/ double *sec, double*bw) {
414 bw_request_t request;
416 request=xbt_new0(s_bw_request_t,1);
417 request->buf_size=buf_size;
418 request->msg_size=msg_size;
419 request->msg_amount=msg_amount;
420 request->min_duration = min_duration;
423 request->peer.name = (char*)to_name;
424 request->peer.port = to_port;
427 sock = gras_socket_client(from_name,from_port);
431 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
432 gras_msg_rpccall(sock,20*60,"BW request", &request, &result);
439 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
440 from_name,from_port, to_name,to_port,
441 result->sec,((double)result->bw)/1024.0);
443 gras_socket_close(sock);
448 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
451 /* specification of the test to run, and our answer */
452 bw_request_t request = *(bw_request_t*)payload;
453 bw_res_t result = xbt_new0(s_bw_res_t,1);
454 gras_socket_t peer,asker;
456 asker=gras_msg_cb_ctx_from(ctx);
457 VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
458 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
460 request->peer.name,request->peer.port,
461 request->msg_size,request->msg_amount);
462 peer = gras_socket_client(request->peer.name,request->peer.port);
464 request->buf_size,request->msg_size,request->msg_amount,
465 request->min_duration,
466 &(result->sec),&(result->bw));
468 gras_msg_rpcreturn(240,ctx,&result);
471 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
472 free(request->peer.name);
479 /** \brief builds a matrix of results of bandwidth measurement
481 * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
482 * as the total amount of data to send and the msg_size. This
483 * was changed for the fool wanting to send more than MAXINT
484 * bytes in a fat pipe.
486 double * amok_bw_matrix(xbt_dynar_t peers,
487 int buf_size_bw, int msg_size_bw, int msg_amount_bw,
488 double min_duration) {
490 /* construction of matrices for bandwith and latency */
493 int i,j,len=xbt_dynar_length(peers);
495 double *matrix_res = xbt_new0(double, len*len);
498 xbt_dynar_foreach (peers,i,p1) {
499 xbt_dynar_foreach (peers,j,p2) {
501 /* Mesurements of Bandwidth */
502 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
503 buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
504 &sec,&matrix_res[i*len + j]);