Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make sure that the code still compiles with the freaking paranoid gcc warning options...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   amok_base_init();
23
24   if (! _amok_bw_initialized) {
25         
26      /* Build the datatype descriptions */ 
27      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28      gras_datadesc_struct_append(bw_request_desc,"host",
29                                  gras_datadesc_by_name("xbt_host_t"));
30      gras_datadesc_struct_append(bw_request_desc,"buf_size",
31                                  gras_datadesc_by_name("unsigned long int"));
32      gras_datadesc_struct_append(bw_request_desc,"exp_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"msg_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_close(bw_request_desc);
37      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38
39      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
41      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
42      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
44      gras_datadesc_struct_close(bw_res_desc);
45      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46
47      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
48      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
49      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
51      gras_datadesc_struct_close(sat_request_desc);
52      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53      
54      /* Register the bandwidth messages */
55      gras_msgtype_declare("BW handshake",     bw_request_desc);
56      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
57      gras_msgtype_declare("BW request",       bw_request_desc);
58      gras_msgtype_declare("BW result",        bw_res_desc);
59      
60      /* Register the saturation messages */
61      gras_msgtype_declare("SAT start",   sat_request_desc);
62      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
63      gras_msgtype_declare("SAT begin",   sat_request_desc);
64      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
65      gras_msgtype_declare("SAT end",     NULL);
66      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
67      gras_msgtype_declare("SAT stop",    NULL);
68      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
69   }
70    
71   /* Register the callbacks */
72   gras_cb_register(gras_msgtype_by_name("BW request"),
73                    &amok_bw_cb_bw_request);
74   gras_cb_register(gras_msgtype_by_name("BW handshake"),
75                    &amok_bw_cb_bw_handshake);
76
77   gras_cb_register(gras_msgtype_by_name("SAT start"),
78                    &amok_bw_cb_sat_start);
79   gras_cb_register(gras_msgtype_by_name("SAT begin"),
80                    &amok_bw_cb_sat_begin);
81   
82   _amok_bw_initialized =1;
83 }
84
85 /** @brief module finalization */
86 void amok_bw_exit(void) {
87   if (! _amok_bw_initialized)
88     return;
89    
90   gras_cb_unregister(gras_msgtype_by_name("BW request"),
91                      &amok_bw_cb_bw_request);
92   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
93                      &amok_bw_cb_bw_handshake);
94
95   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
96                      &amok_bw_cb_sat_start);
97   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
98                      &amok_bw_cb_sat_begin);
99
100   _amok_bw_initialized = 0;
101 }
102
103 /* ***************************************************************************
104  * Bandwidth tests
105  * ***************************************************************************/
106
107 /**
108  * \brief bandwidth measurement between localhost and \e peer
109  * 
110  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111  * \arg buf_size: Size of the socket buffer
112  * \arg exp_size: Total size of data sent across the network
113  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
114  * \arg sec: where the result (in seconds) should be stored.
115  * \arg bw: observed Bandwidth (in kb/s) 
116  *
117  * Conduct a bandwidth test from the local process to the given peer.
118  * This call is blocking until the end of the experiment.
119  *
120  * Results are reported in last args, and sizes are in kb.
121  */
122 void amok_bw_test(gras_socket_t peer,
123                   unsigned long int buf_size,
124                   unsigned long int exp_size,
125                   unsigned long int msg_size,
126           /*OUT*/ double *sec, double *bw) {
127
128   /* Measurement sockets for the experiments */
129   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
130   int port;
131   bw_request_t request,request_ack;
132   xbt_ex_t e;
133   
134   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
135     TRY {
136       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
137     } CATCH(e) {
138       measMasterIn = NULL;
139       if (port < 10000) {
140         xbt_ex_free(e);
141       } else {
142         RETHROW0("Error caught while opening a measurement socket: %s");
143       }
144     }
145   }
146   
147   request=xbt_new0(s_bw_request_t,1);
148   request->buf_size=buf_size*1024;
149   request->exp_size=exp_size*1024;
150   request->msg_size=msg_size*1024;
151   request->host.name = NULL;
152   request->host.port = gras_socket_my_port(measMasterIn);
153   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
154         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
155         buf_size,request->buf_size);
156
157   TRY {
158     gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
159   } CATCH(e) {
160     RETHROW0("Error encountered while sending the BW request: %s");
161   }
162   measIn = gras_socket_meas_accept(measMasterIn);
163
164   TRY {
165     gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
166   } CATCH(e) {
167     RETHROW0("Error encountered while waiting for the answer to BW request: %s");
168   }
169   
170   /* FIXME: What if there is a remote error? */
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->host.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->host.port);
179   }
180   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
181
182   *sec=gras_os_time();
183   TRY {
184     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
185     gras_socket_meas_recv(measIn,120,1,1);
186   } CATCH(e) {
187     gras_socket_close(measOut);
188     gras_socket_close(measMasterIn);
189     gras_socket_close(measIn);
190     RETHROW0("Unable to conduct the experiment: %s");
191   }
192
193   *sec = gras_os_time() - *sec;
194   *bw = ((double)exp_size) / *sec;
195
196   free(request_ack);
197   free(request);
198   if (measIn != measMasterIn)
199     gras_socket_close(measIn);
200   gras_socket_close(measMasterIn);
201   gras_socket_close(measOut);
202 }
203
204
205 /* Callback to the "BW handshake" message: 
206    opens a server measurement socket,
207    indicate its port in an "BW handshaked" message,
208    receive the corresponding data on the measurement socket, 
209    close the measurment socket
210
211    sizes are in byte (got converted from kb my expeditor)
212 */
213 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
214                             void          *payload) {
215   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
216   bw_request_t request=*(bw_request_t*)payload;
217   bw_request_t answer;
218   xbt_ex_t e;
219   int port;
220   
221   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
222         gras_socket_peer_name(expeditor),request->host.port,
223         request->buf_size,request->exp_size,request->msg_size);     
224
225   /* Build our answer */
226   answer = xbt_new0(s_bw_request_t,1);
227   
228   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
229     TRY {
230       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
231     } CATCH(e) {
232       measMasterIn = NULL;
233       if (port < 10000)
234         xbt_ex_free(e);
235       else
236         /* FIXME: tell error to remote */
237         RETHROW0("Error encountered while opening a measurement server socket: %s");
238     }
239   }
240    
241   answer->buf_size=request->buf_size;
242   answer->exp_size=request->exp_size;
243   answer->msg_size=request->msg_size;
244   answer->host.port=gras_socket_my_port(measMasterIn);
245
246   /* Don't connect asap to leave time to other side to enter the accept() */
247   TRY {
248     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
249                                      request->host.port,
250                                      request->buf_size,1);
251   } CATCH(e) {
252     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
253              gras_socket_peer_name(expeditor),request->host.port);
254     /* FIXME: tell error to remote */
255   }
256
257   TRY {
258     gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
259   } CATCH(e) { 
260     gras_socket_close(measMasterIn);
261     gras_socket_close(measOut);
262     /* FIXME: tell error to remote */
263     RETHROW0("Error encountered while sending the answer: %s");
264   }
265
266   TRY {
267     measIn = gras_socket_meas_accept(measMasterIn);
268     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
269            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
270
271     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
272     gras_socket_meas_send(measOut,120,1,1);
273   } CATCH(e) {
274     gras_socket_close(measMasterIn);
275     gras_socket_close(measIn);
276     gras_socket_close(measOut);
277     /* FIXME: tell error to remote ? */
278     RETHROW0("Error encountered while receiving the experiment: %s");
279   }
280
281   if (measIn != measMasterIn)
282     gras_socket_close(measMasterIn);
283   gras_socket_close(measIn);
284   gras_socket_close(measOut);
285   free(answer);
286   free(request);
287   DEBUG0("BW experiment done.");
288   return 1;
289 }
290
291 /**
292  * \brief request a bandwidth measurement between two remote hosts
293  *
294  * \arg from_name: Name of the first host 
295  * \arg from_port: port on which the first process is listening for messages
296  * \arg to_name: Name of the second host 
297  * \arg to_port: port on which the second process is listening (for messages, do not 
298  * give a measurement socket here. The needed measurement sockets will be created 
299  * automatically and negociated between the peers)
300  * \arg buf_size: Size of the socket buffer
301  * \arg exp_size: Total size of data sent across the network
302  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
303  * \arg sec: where the result (in seconds) should be stored.
304  * \arg bw: observed Bandwidth (in kb/s)
305  *
306  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
307  * This call is blocking until the end of the experiment.
308  *
309  * Results are reported in last args, and sizes are in kb.
310  */
311 void amok_bw_request(const char* from_name,unsigned int from_port,
312                      const char* to_name,unsigned int to_port,
313                      unsigned long int buf_size,
314                      unsigned long int exp_size,
315                      unsigned long int msg_size,
316              /*OUT*/ double *sec, double*bw) {
317   
318   gras_socket_t sock;
319   /* The request */
320   bw_request_t request;
321   bw_res_t result;
322
323   request=xbt_new0(s_bw_request_t,1);
324   request->buf_size=buf_size;
325   request->exp_size=exp_size;
326   request->msg_size=msg_size;
327
328   request->host.name = (char*)to_name;
329   request->host.port = to_port;
330
331   sock = gras_socket_client(from_name,from_port);
332   gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
333   free(request);
334
335   gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
336   
337   *sec=result->sec;
338   *bw =result->bw;
339
340   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
341         from_name,from_port, to_name,to_port,
342         *sec,*bw);
343
344   gras_socket_close(sock);
345   free(result);
346 }
347
348 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
349                           void            *payload) {
350                           
351   /* specification of the test to run, and our answer */
352   bw_request_t request = *(bw_request_t*)payload;
353   bw_res_t result = xbt_new0(s_bw_res,1);
354   gras_socket_t peer;
355
356   peer = gras_socket_client(request->host.name,request->host.port);
357   amok_bw_test(peer,
358                request->buf_size,request->exp_size,request->msg_size,
359                &(result->sec),&(result->bw));
360
361   gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
362
363   gras_os_sleep(1);
364   gras_socket_close(peer);
365   free(request);
366   free(result);
367   
368   return 1;
369 }
370
371 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
372                          void             *payload) {
373    CRITICAL0("amok_bw_cb_sat_start; not implemented");
374    return 1;
375
376 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
377                          void             *payload) {
378    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
379    return 1;
380 }