Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
In amok_bw_test, it is now possible to ask that the measurement lasts at least N...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123                   double min_duration,
124           /*OUT*/ double *sec, double *bw) {
125
126   /* Measurement sockets for the experiments */
127   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
128   int port;
129   bw_request_t request,request_ack;
130   xbt_ex_t e;
131   
132   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
133     TRY {
134       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
135     } CATCH(e) {
136       measMasterIn = NULL;
137       if (port == 10000 -1) {
138         RETHROW0("Error caught while opening a measurement socket: %s");
139       } else {
140         xbt_ex_free(e); 
141       }
142     }
143   }
144   
145   request=xbt_new0(s_bw_request_t,1);
146   request->buf_size=buf_size;
147   request->exp_size=exp_size;
148   request->msg_size=msg_size;
149   request->host.name = NULL;
150   request->host.port = gras_socket_my_port(measMasterIn);
151   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
152         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153         buf_size,request->buf_size);
154
155   TRY {
156     gras_msg_rpccall(peer,60,
157                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162    
163   TRY {
164     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165                                    request_ack->host.port, 
166                                    request->buf_size,1);
167   } CATCH(e) {
168     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169              gras_socket_peer_name(peer),request_ack->host.port);
170   }
171   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
172
173   *sec = 0;
174   do {
175     if (*sec>0) {
176       double meas_duration=*sec;
177       request->msg_size = request->msg_size * (min_duration / meas_duration);
178
179       DEBUG3("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%ld",
180              meas_duration,min_duration,request->msg_size);
181       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
182     }
183
184     *sec=gras_os_time();
185     TRY {
186       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
187       DEBUG0("Data sent. Wait ACK");
188       gras_socket_meas_recv(measIn,120,1,1);
189     } CATCH(e) {
190       gras_socket_close(measOut);
191       gras_socket_close(measMasterIn);
192       gras_socket_close(measIn);
193       RETHROW0("Unable to conduct the experiment: %s");
194     }
195     DEBUG0("Experiment done");
196
197     *sec = gras_os_time() - *sec;
198     *bw = ((double)exp_size) / *sec;
199   } while (*sec < min_duration);
200
201   DEBUG0("This measurement was long enough. Stop peer");
202   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
203
204   free(request_ack);
205   free(request);
206   if (measIn != measMasterIn)
207     gras_socket_close(measIn);
208   gras_socket_close(measMasterIn);
209   gras_socket_close(measOut);
210 }
211
212
213 /* Callback to the "BW handshake" message: 
214    opens a server measurement socket,
215    indicate its port in an "BW handshaked" message,
216    receive the corresponding data on the measurement socket, 
217    close the measurment socket
218
219    sizes are in byte
220 */
221 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
222                             void          *payload) {
223   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
224   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
225   bw_request_t request=*(bw_request_t*)payload;
226   bw_request_t answer;
227   xbt_ex_t e;
228   int port;
229   int tooshort = 1;
230   gras_msg_cb_ctx_t ctx_reask;
231   static xbt_dynar_t msgtwaited=NULL;
232   
233   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
234         gras_socket_peer_name(expeditor),request->host.port,
235         request->buf_size,request->exp_size,request->msg_size);     
236
237   /* Build our answer */
238   answer = xbt_new0(s_bw_request_t,1);
239   
240   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
241     TRY {
242       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
243     } CATCH(e) {
244       measMasterIn = NULL;
245       if (port < 10000)
246         xbt_ex_free(e);
247       else
248         /* FIXME: tell error to remote */
249         RETHROW0("Error encountered while opening a measurement server socket: %s");
250     }
251   }
252    
253   answer->buf_size=request->buf_size;
254   answer->exp_size=request->exp_size;
255   answer->msg_size=request->msg_size;
256   answer->host.port=gras_socket_my_port(measMasterIn);
257
258   TRY {
259     gras_msg_rpcreturn(60,ctx,&answer);
260   } CATCH(e) { 
261     gras_socket_close(measMasterIn);
262     /* FIXME: tell error to remote */
263     RETHROW0("Error encountered while sending the answer: %s");
264   }
265
266
267   /* Don't connect asap to leave time to other side to enter the accept() */
268   TRY {
269     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
270                                      request->host.port,
271                                      request->buf_size,1);
272   } CATCH(e) {
273     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
274              gras_socket_peer_name(expeditor),request->host.port);
275     /* FIXME: tell error to remote */
276   }
277
278   TRY {
279     measIn = gras_socket_meas_accept(measMasterIn);
280     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
281            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
282   } CATCH(e) {
283     gras_socket_close(measMasterIn);
284     gras_socket_close(measIn);
285     gras_socket_close(measOut);
286     /* FIXME: tell error to remote ? */
287     RETHROW0("Error encountered while opening the meas socket: %s");
288   }
289
290   if (!msgtwaited) {
291     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
292     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
293     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
294   }
295
296   while (tooshort) {
297     void *payload;
298     int msggot;
299     TRY {
300       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
301       gras_socket_meas_send(measOut,120,1,1);
302       DEBUG0("ACK sent");
303     } CATCH(e) {
304       gras_socket_close(measMasterIn);
305       gras_socket_close(measIn);
306       gras_socket_close(measOut);
307       /* FIXME: tell error to remote ? */
308       RETHROW0("Error encountered while receiving the experiment: %s");
309     }
310     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
311     switch(msggot) {
312     case 0: /* BW stop */
313       tooshort = 0;
314       break;
315     case 1: /* BW reask */
316       tooshort = 1;
317       free(request);
318       request = (bw_request_t)payload;
319       gras_msg_rpcreturn(60,ctx_reask,NULL);
320     }
321     gras_msg_cb_ctx_free(ctx_reask);
322   }
323
324   if (measIn != measMasterIn)
325     gras_socket_close(measMasterIn);
326   gras_socket_close(measIn);
327   gras_socket_close(measOut);
328   free(answer);
329   free(request);
330   VERB0("BW experiment done.");
331   return 1;
332 }
333
334 /**
335  * \brief request a bandwidth measurement between two remote hosts
336  *
337  * \arg from_name: Name of the first host 
338  * \arg from_port: port on which the first process is listening for messages
339  * \arg to_name: Name of the second host 
340  * \arg to_port: port on which the second process is listening (for messages, do not 
341  * give a measurement socket here. The needed measurement sockets will be created 
342  * automatically and negociated between the peers)
343  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
344  * \arg exp_size: Total size of data sent across the network
345  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
346  * \arg sec: where the result (in seconds) should be stored.
347  * \arg bw: observed Bandwidth (in byte/s)
348  *
349  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
350  * This call is blocking until the end of the experiment.
351  *
352  * Results are reported in last args, and sizes are in bytes.
353  */
354 void amok_bw_request(const char* from_name,unsigned int from_port,
355                      const char* to_name,unsigned int to_port,
356                      unsigned long int buf_size,
357                      unsigned long int exp_size,
358                      unsigned long int msg_size,
359              /*OUT*/ double *sec, double*bw) {
360   
361   gras_socket_t sock;
362   /* The request */
363   bw_request_t request;
364   bw_res_t result;
365
366   request=xbt_new0(s_bw_request_t,1);
367   request->buf_size=buf_size;
368   request->exp_size=exp_size;
369   request->msg_size=msg_size;
370
371   request->host.name = (char*)to_name;
372   request->host.port = to_port;
373
374   sock = gras_socket_client(from_name,from_port);
375   gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
376   
377   if (sec)
378     *sec=result->sec;
379   if (bw)
380     *bw =result->bw;
381
382   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
383         from_name,from_port, to_name,to_port,
384         result->sec,((double)result->bw)/1024.0);
385
386   gras_socket_close(sock);
387   free(result);
388   free(request);
389 }
390
391 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
392                           void            *payload) {
393                           
394   /* specification of the test to run, and our answer */
395   bw_request_t request = *(bw_request_t*)payload;
396   bw_res_t result = xbt_new0(s_bw_res_t,1);
397   gras_socket_t peer,asker;
398
399   asker=gras_msg_cb_ctx_from(ctx);
400   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
401         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
402         request->host.name,request->host.port);
403   peer = gras_socket_client(request->host.name,request->host.port);
404   amok_bw_test(peer,
405                request->buf_size,request->exp_size,request->msg_size,
406                0,
407                &(result->sec),&(result->bw));
408
409   gras_msg_rpcreturn(240,ctx,&result);
410
411   gras_os_sleep(1);
412   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
413   free(request->host.name);
414   free(request);
415   free(result);
416   
417   return 1;
418 }
419
420 double * amok_bw_matrix(xbt_dynar_t hosts,
421                          int buf_size_bw, int exp_size_bw, int msg_size_bw) { 
422   double sec;
423   /* construct of matrixs for bandwith and Latency */
424
425
426   int i,j,len=xbt_dynar_length(hosts);
427
428   double *matrix_res = xbt_new0(double, len*len);
429   xbt_host_t h1,h2;
430
431   xbt_dynar_foreach (hosts,i,h1) {
432     xbt_dynar_foreach (hosts,j,h2) {
433       if (i!=j) {
434         /* Mesurements of Bandwidth */
435         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
436                         buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_res[i*len + j]);
437       } 
438     }
439   }
440   return matrix_res;
441 }