Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Plug a memleak and shut it up
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
83 }
84 void amok_bw_bw_join() {
85   gras_cb_register(gras_msgtype_by_name("BW request"),
86                    &amok_bw_cb_bw_request);
87   gras_cb_register(gras_msgtype_by_name("BW handshake"),
88                    &amok_bw_cb_bw_handshake);
89 }
90 void amok_bw_bw_leave() {
91   gras_cb_unregister(gras_msgtype_by_name("BW request"),
92                      &amok_bw_cb_bw_request);
93   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
94                      &amok_bw_cb_bw_handshake);
95 }
96
97 /**
98  * \brief bandwidth measurement between localhost and \e peer
99  * 
100  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
101  * \arg buf_size: Size of the socket buffer
102  * \arg exp_size: Total size of data sent across the network
103  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
104  * \arg sec: where the result (in seconds) should be stored.
105  * \arg bw: observed Bandwidth (in byte/s) 
106  *
107  * Conduct a bandwidth test from the local process to the given peer.
108  * This call is blocking until the end of the experiment.
109  *
110  * Results are reported in last args, and sizes are in byte.
111  */
112 void amok_bw_test(gras_socket_t peer,
113                   unsigned long int buf_size,
114                   unsigned long int exp_size,
115                   unsigned long int msg_size,
116           /*OUT*/ double *sec, double *bw) {
117
118   /* Measurement sockets for the experiments */
119   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
120   int port;
121   bw_request_t request,request_ack;
122   xbt_ex_t e;
123   
124   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
125     TRY {
126       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
127     } CATCH(e) {
128       measMasterIn = NULL;
129       if (port == 10000 -1) {
130         RETHROW0("Error caught while opening a measurement socket: %s");
131       } else {
132         xbt_ex_free(&e);        
133       }
134     }
135   }
136   
137   request=xbt_new0(s_bw_request_t,1);
138   request->buf_size=buf_size;
139   request->exp_size=exp_size;
140   request->msg_size=msg_size;
141   request->host.name = NULL;
142   request->host.port = gras_socket_my_port(measMasterIn);
143   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
144         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
145         buf_size,request->buf_size);
146
147   TRY {
148     gras_msg_rpccall(peer,60,
149                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
150   } CATCH(e) {
151     RETHROW0("Error encountered while sending the BW request: %s");
152   }
153   measIn = gras_socket_meas_accept(measMasterIn);
154    
155   TRY {
156     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
157                                    request_ack->host.port, 
158                                    request->buf_size,1);
159   } CATCH(e) {
160     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
161              gras_socket_peer_name(peer),request_ack->host.port);
162   }
163   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
164
165   *sec=gras_os_time();
166   TRY {
167     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
168     gras_socket_meas_recv(measIn,120,1,1);
169   } CATCH(e) {
170     gras_socket_close(measOut);
171     gras_socket_close(measMasterIn);
172     gras_socket_close(measIn);
173     RETHROW0("Unable to conduct the experiment: %s");
174   }
175
176   *sec = gras_os_time() - *sec;
177   *bw = ((double)exp_size) / *sec;
178
179   free(request_ack);
180   free(request);
181   if (measIn != measMasterIn)
182     gras_socket_close(measIn);
183   gras_socket_close(measMasterIn);
184   gras_socket_close(measOut);
185 }
186
187
188 /* Callback to the "BW handshake" message: 
189    opens a server measurement socket,
190    indicate its port in an "BW handshaked" message,
191    receive the corresponding data on the measurement socket, 
192    close the measurment socket
193
194    sizes are in byte
195 */
196 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
197                             void          *payload) {
198   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
199   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
200   bw_request_t request=*(bw_request_t*)payload;
201   bw_request_t answer;
202   xbt_ex_t e;
203   int port;
204   
205   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
206         gras_socket_peer_name(expeditor),request->host.port,
207         request->buf_size,request->exp_size,request->msg_size);     
208
209   /* Build our answer */
210   answer = xbt_new0(s_bw_request_t,1);
211   
212   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
213     TRY {
214       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
215     } CATCH(e) {
216       measMasterIn = NULL;
217       if (port < 10000)
218         xbt_ex_free(&e);
219       else
220         /* FIXME: tell error to remote */
221         RETHROW0("Error encountered while opening a measurement server socket: %s");
222     }
223   }
224    
225   answer->buf_size=request->buf_size;
226   answer->exp_size=request->exp_size;
227   answer->msg_size=request->msg_size;
228   answer->host.port=gras_socket_my_port(measMasterIn);
229
230   TRY {
231     gras_msg_rpcreturn(60,ctx,&answer);
232   } CATCH(e) { 
233     gras_socket_close(measMasterIn);
234     /* FIXME: tell error to remote */
235     RETHROW0("Error encountered while sending the answer: %s");
236   }
237
238
239   /* Don't connect asap to leave time to other side to enter the accept() */
240   TRY {
241     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
242                                      request->host.port,
243                                      request->buf_size,1);
244   } CATCH(e) {
245     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
246              gras_socket_peer_name(expeditor),request->host.port);
247     /* FIXME: tell error to remote */
248   }
249
250   TRY {
251     measIn = gras_socket_meas_accept(measMasterIn);
252     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
253            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
254
255     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
256     gras_socket_meas_send(measOut,120,1,1);
257   } CATCH(e) {
258     gras_socket_close(measMasterIn);
259     gras_socket_close(measIn);
260     gras_socket_close(measOut);
261     /* FIXME: tell error to remote ? */
262     RETHROW0("Error encountered while receiving the experiment: %s");
263   }
264
265   if (measIn != measMasterIn)
266     gras_socket_close(measMasterIn);
267   gras_socket_close(measIn);
268   gras_socket_close(measOut);
269   free(answer);
270   free(request);
271   DEBUG0("BW experiment done.");
272   return 1;
273 }
274
275 /**
276  * \brief request a bandwidth measurement between two remote hosts
277  *
278  * \arg from_name: Name of the first host 
279  * \arg from_port: port on which the first process is listening for messages
280  * \arg to_name: Name of the second host 
281  * \arg to_port: port on which the second process is listening (for messages, do not 
282  * give a measurement socket here. The needed measurement sockets will be created 
283  * automatically and negociated between the peers)
284  * \arg buf_size: Size of the socket buffer
285  * \arg exp_size: Total size of data sent across the network
286  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
287  * \arg sec: where the result (in seconds) should be stored.
288  * \arg bw: observed Bandwidth (in byte/s)
289  *
290  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
291  * This call is blocking until the end of the experiment.
292  *
293  * Results are reported in last args, and sizes are in bytes.
294  */
295 void amok_bw_request(const char* from_name,unsigned int from_port,
296                      const char* to_name,unsigned int to_port,
297                      unsigned long int buf_size,
298                      unsigned long int exp_size,
299                      unsigned long int msg_size,
300              /*OUT*/ double *sec, double*bw) {
301   
302   gras_socket_t sock;
303   /* The request */
304   bw_request_t request;
305   bw_res_t result;
306
307   request=xbt_new0(s_bw_request_t,1);
308   request->buf_size=buf_size;
309   request->exp_size=exp_size;
310   request->msg_size=msg_size;
311
312   request->host.name = (char*)to_name;
313   request->host.port = to_port;
314
315   sock = gras_socket_client(from_name,from_port);
316   gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
317   
318   if (sec)
319     *sec=result->sec;
320   if (bw)
321     *bw =result->bw;
322
323   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
324         from_name,from_port, to_name,to_port,
325         *sec,((double)*bw)/1024.0);
326
327   gras_socket_close(sock);
328   free(result);
329   free(request);
330 }
331
332 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
333                           void            *payload) {
334                           
335   /* specification of the test to run, and our answer */
336   bw_request_t request = *(bw_request_t*)payload;
337   bw_res_t result = xbt_new0(s_bw_res_t,1);
338   gras_socket_t peer;
339
340   peer = gras_socket_client(request->host.name,request->host.port);
341   amok_bw_test(peer,
342                request->buf_size,request->exp_size,request->msg_size,
343                &(result->sec),&(result->bw));
344
345   gras_msg_rpcreturn(240,ctx,&result);
346
347   gras_os_sleep(1);
348   gras_socket_close(peer);
349   free(request->host.name);
350   free(request);
351   free(result);
352   
353   return 1;
354 }
355
356 double * amok_bw_matrix(xbt_dynar_t hosts,
357                          int buf_size_bw, int exp_size_bw, int msg_size_bw) { 
358   double sec;
359   /* construct of matrixs for bandwith and Latency */
360
361
362   int i,j,len=xbt_dynar_length(hosts);
363
364   double *matrix_res = xbt_new0(double, len*len);
365   xbt_host_t h1,h2;
366
367   xbt_dynar_foreach (hosts,i,h1) {
368     xbt_dynar_foreach (hosts,j,h2) {
369       if (i!=j) {
370         /* Mesurements of Bandwidth */
371         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
372                         buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_res[i*len + j]);
373       } 
374     }
375   }
376   return matrix_res;
377 }