Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
allow to request remote measurement with a min_duration (that was easy, apparently)
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123                   double min_duration,
124           /*OUT*/ double *sec, double *bw) {
125
126   /* Measurement sockets for the experiments */
127   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
128   int port;
129   bw_request_t request,request_ack;
130   xbt_ex_t e;
131   
132   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
133     TRY {
134       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
135     } CATCH(e) {
136       measMasterIn = NULL;
137       if (port == 10000 -1) {
138         RETHROW0("Error caught while opening a measurement socket: %s");
139       } else {
140         xbt_ex_free(e); 
141       }
142     }
143   }
144   
145   request=xbt_new0(s_bw_request_t,1);
146   request->buf_size=buf_size;
147   request->exp_size=exp_size;
148   request->msg_size=msg_size;
149   request->host.name = NULL;
150   request->host.port = gras_socket_my_port(measMasterIn);
151   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
152         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153         buf_size,request->buf_size);
154
155   TRY {
156     gras_msg_rpccall(peer,60,
157                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162    
163   TRY {
164     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165                                    request_ack->host.port, 
166                                    request->buf_size,1);
167   } CATCH(e) {
168     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169              gras_socket_peer_name(peer),request_ack->host.port);
170   }
171   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
172
173   *sec = 0;
174   do {
175     if (*sec>0) {
176       double meas_duration=*sec;
177       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
178
179       DEBUG4("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld (got %fkb/s)",
180              meas_duration,min_duration,request->exp_size,((double)exp_size) / *sec/1024);
181       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
182     }
183
184     *sec=gras_os_time();
185     TRY {
186       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
187       DEBUG0("Data sent. Wait ACK");
188       gras_socket_meas_recv(measIn,120,1,1);
189     } CATCH(e) {
190       gras_socket_close(measOut);
191       gras_socket_close(measMasterIn);
192       gras_socket_close(measIn);
193       RETHROW0("Unable to conduct the experiment: %s");
194     }
195     DEBUG0("Experiment done");
196
197     *sec = gras_os_time() - *sec;
198     *bw = ((double)exp_size) / *sec;
199   } while (*sec < min_duration);
200
201   DEBUG0("This measurement was long enough. Stop peer");
202   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
203
204   free(request_ack);
205   free(request);
206   if (measIn != measMasterIn)
207     gras_socket_close(measIn);
208   gras_socket_close(measMasterIn);
209   gras_socket_close(measOut);
210 }
211
212
213 /* Callback to the "BW handshake" message: 
214    opens a server measurement socket,
215    indicate its port in an "BW handshaked" message,
216    receive the corresponding data on the measurement socket, 
217    close the measurment socket
218
219    sizes are in byte
220 */
221 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
222                             void          *payload) {
223   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
224   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
225   bw_request_t request=*(bw_request_t*)payload;
226   bw_request_t answer;
227   xbt_ex_t e;
228   int port;
229   int tooshort = 1;
230   gras_msg_cb_ctx_t ctx_reask;
231   static xbt_dynar_t msgtwaited=NULL;
232   
233   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
234         gras_socket_peer_name(expeditor),request->host.port,
235         request->buf_size,request->exp_size,request->msg_size);     
236
237   /* Build our answer */
238   answer = xbt_new0(s_bw_request_t,1);
239   
240   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
241     TRY {
242       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
243     } CATCH(e) {
244       measMasterIn = NULL;
245       if (port < 10000)
246         xbt_ex_free(e);
247       else
248         /* FIXME: tell error to remote */
249         RETHROW0("Error encountered while opening a measurement server socket: %s");
250     }
251   }
252    
253   answer->buf_size=request->buf_size;
254   answer->exp_size=request->exp_size;
255   answer->msg_size=request->msg_size;
256   answer->host.port=gras_socket_my_port(measMasterIn);
257
258   TRY {
259     gras_msg_rpcreturn(60,ctx,&answer);
260   } CATCH(e) { 
261     gras_socket_close(measMasterIn);
262     /* FIXME: tell error to remote */
263     RETHROW0("Error encountered while sending the answer: %s");
264   }
265
266
267   /* Don't connect asap to leave time to other side to enter the accept() */
268   TRY {
269     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
270                                      request->host.port,
271                                      request->buf_size,1);
272   } CATCH(e) {
273     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
274              gras_socket_peer_name(expeditor),request->host.port);
275     /* FIXME: tell error to remote */
276   }
277
278   TRY {
279     measIn = gras_socket_meas_accept(measMasterIn);
280     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
281            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
282   } CATCH(e) {
283     gras_socket_close(measMasterIn);
284     gras_socket_close(measIn);
285     gras_socket_close(measOut);
286     /* FIXME: tell error to remote ? */
287     RETHROW0("Error encountered while opening the meas socket: %s");
288   }
289
290   if (!msgtwaited) {
291     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
292     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
293     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
294   }
295
296   while (tooshort) {
297     void *payload;
298     int msggot;
299     TRY {
300       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
301       gras_socket_meas_send(measOut,120,1,1);
302       DEBUG0("ACK sent");
303     } CATCH(e) {
304       gras_socket_close(measMasterIn);
305       gras_socket_close(measIn);
306       gras_socket_close(measOut);
307       /* FIXME: tell error to remote ? */
308       RETHROW0("Error encountered while receiving the experiment: %s");
309     }
310     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
311     switch(msggot) {
312     case 0: /* BW stop */
313       tooshort = 0;
314       break;
315     case 1: /* BW reask */
316       tooshort = 1;
317       free(request);
318       request = (bw_request_t)payload;
319       gras_msg_rpcreturn(60,ctx_reask,NULL);
320     }
321     gras_msg_cb_ctx_free(ctx_reask);
322   }
323
324   if (measIn != measMasterIn)
325     gras_socket_close(measMasterIn);
326   gras_socket_close(measIn);
327   gras_socket_close(measOut);
328   free(answer);
329   free(request);
330   VERB0("BW experiment done.");
331   return 1;
332 }
333
334 /**
335  * \brief request a bandwidth measurement between two remote hosts
336  *
337  * \arg from_name: Name of the first host 
338  * \arg from_port: port on which the first process is listening for messages
339  * \arg to_name: Name of the second host 
340  * \arg to_port: port on which the second process is listening (for messages, do not 
341  * give a measurement socket here. The needed measurement sockets will be created 
342  * automatically and negociated between the peers)
343  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
344  * \arg exp_size: Total size of data sent across the network
345  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
346  * \arg sec: where the result (in seconds) should be stored.
347  * \arg bw: observed Bandwidth (in byte/s)
348  *
349  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
350  * This call is blocking until the end of the experiment.
351  *
352  * Results are reported in last args, and sizes are in bytes.
353  */
354 void amok_bw_request(const char* from_name,unsigned int from_port,
355                      const char* to_name,unsigned int to_port,
356                      unsigned long int buf_size,
357                      unsigned long int exp_size,
358                      unsigned long int msg_size,
359                      double min_duration,
360              /*OUT*/ double *sec, double*bw) {
361   
362   gras_socket_t sock;
363   /* The request */
364   bw_request_t request;
365   bw_res_t result;
366
367   request=xbt_new0(s_bw_request_t,1);
368   request->buf_size=buf_size;
369   request->exp_size=exp_size;
370   request->msg_size=msg_size;
371   request->min_duration = min_duration;
372
373   request->host.name = (char*)to_name;
374   request->host.port = to_port;
375
376   sock = gras_socket_client(from_name,from_port);
377   gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
378   
379   if (sec)
380     *sec=result->sec;
381   if (bw)
382     *bw =result->bw;
383
384   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
385         from_name,from_port, to_name,to_port,
386         result->sec,((double)result->bw)/1024.0);
387
388   gras_socket_close(sock);
389   free(result);
390   free(request);
391 }
392
393 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
394                           void            *payload) {
395                           
396   /* specification of the test to run, and our answer */
397   bw_request_t request = *(bw_request_t*)payload;
398   bw_res_t result = xbt_new0(s_bw_res_t,1);
399   gras_socket_t peer,asker;
400
401   asker=gras_msg_cb_ctx_from(ctx);
402   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
403         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
404         request->host.name,request->host.port);
405   peer = gras_socket_client(request->host.name,request->host.port);
406   amok_bw_test(peer,
407                request->buf_size,request->exp_size,request->msg_size,
408                request->min_duration,
409                &(result->sec),&(result->bw));
410
411   gras_msg_rpcreturn(240,ctx,&result);
412
413   gras_os_sleep(1);
414   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
415   free(request->host.name);
416   free(request);
417   free(result);
418   
419   return 1;
420 }
421
422 double * amok_bw_matrix(xbt_dynar_t hosts,
423                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
424                         double min_duration) { 
425   double sec;
426   /* construct of matrixs for bandwith and Latency */
427
428
429   int i,j,len=xbt_dynar_length(hosts);
430
431   double *matrix_res = xbt_new0(double, len*len);
432   xbt_host_t h1,h2;
433
434   xbt_dynar_foreach (hosts,i,h1) {
435     xbt_dynar_foreach (hosts,j,h2) {
436       if (i!=j) {
437         /* Mesurements of Bandwidth */
438         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
439                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
440                         &sec,&matrix_res[i*len + j]);
441       } 
442     }
443   }
444   return matrix_res;
445 }