Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Do use those wonderful RPC
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   amok_base_init();
23
24   if (! _amok_bw_initialized) {
25         
26      /* Build the Bandwidth datatype descriptions */ 
27      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28      gras_datadesc_struct_append(bw_request_desc,"host",
29                                  gras_datadesc_by_name("xbt_host_t"));
30      gras_datadesc_struct_append(bw_request_desc,"buf_size",
31                                  gras_datadesc_by_name("unsigned long int"));
32      gras_datadesc_struct_append(bw_request_desc,"exp_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"msg_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_close(bw_request_desc);
37      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38
39      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
47      gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
48
49      /* Build the saturation datatype descriptions */ 
50      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
51      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
52      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
53      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
54      gras_datadesc_struct_close(sat_request_desc);
55      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
56      
57      /* Register the saturation messages */
58      gras_msgtype_declare("SAT start",   sat_request_desc);
59      gras_msgtype_declare("SAT started", NULL);
60      gras_msgtype_declare("SAT begin",   sat_request_desc);
61      gras_msgtype_declare("SAT begun",   NULL);
62      gras_msgtype_declare("SAT end",     NULL);
63      gras_msgtype_declare("SAT ended",   NULL);
64      gras_msgtype_declare("SAT stop",    NULL);
65      gras_msgtype_declare("SAT stopped", NULL);
66   }
67    
68   /* Register the callbacks */
69   gras_cb_register(gras_msgtype_by_name("BW request"),
70                    &amok_bw_cb_bw_request);
71   gras_cb_register(gras_msgtype_by_name("BW handshake"),
72                    &amok_bw_cb_bw_handshake);
73
74   gras_cb_register(gras_msgtype_by_name("SAT start"),
75                    &amok_bw_cb_sat_start);
76   gras_cb_register(gras_msgtype_by_name("SAT begin"),
77                    &amok_bw_cb_sat_begin);
78   
79   _amok_bw_initialized =1;
80 }
81
82 /** @brief module finalization */
83 void amok_bw_exit(void) {
84   if (! _amok_bw_initialized)
85     return;
86    
87   gras_cb_unregister(gras_msgtype_by_name("BW request"),
88                      &amok_bw_cb_bw_request);
89   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
90                      &amok_bw_cb_bw_handshake);
91
92   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
93                      &amok_bw_cb_sat_start);
94   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
95                      &amok_bw_cb_sat_begin);
96
97   _amok_bw_initialized = 0;
98 }
99
100 /* ***************************************************************************
101  * Bandwidth tests
102  * ***************************************************************************/
103
104 /**
105  * \brief bandwidth measurement between localhost and \e peer
106  * 
107  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
108  * \arg buf_size: Size of the socket buffer
109  * \arg exp_size: Total size of data sent across the network
110  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123           /*OUT*/ double *sec, double *bw) {
124
125   /* Measurement sockets for the experiments */
126   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
127   int port;
128   bw_request_t request,request_ack;
129   xbt_ex_t e;
130   
131   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
132     TRY {
133       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
134     } CATCH(e) {
135       measMasterIn = NULL;
136       if (port == 10000 -1) {
137         RETHROW0("Error caught while opening a measurement socket: %s");
138       } else {
139         xbt_ex_free(e); 
140       }
141     }
142   }
143   
144   request=xbt_new0(s_bw_request_t,1);
145   request->buf_size=buf_size;
146   request->exp_size=exp_size;
147   request->msg_size=msg_size;
148   request->host.name = NULL;
149   request->host.port = gras_socket_my_port(measMasterIn);
150   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
151         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
152         buf_size,request->buf_size);
153
154   TRY {
155     gras_msg_rpccall(peer,60,
156                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
157   } CATCH(e) {
158     RETHROW0("Error encountered while sending the BW request: %s");
159   }
160   measIn = gras_socket_meas_accept(measMasterIn);
161    
162   TRY {
163     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
164                                    request_ack->host.port, 
165                                    request->buf_size,1);
166   } CATCH(e) {
167     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
168              gras_socket_peer_name(peer),request_ack->host.port);
169   }
170   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
171
172   *sec=gras_os_time();
173   TRY {
174     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
175     gras_socket_meas_recv(measIn,120,1,1);
176   } CATCH(e) {
177     gras_socket_close(measOut);
178     gras_socket_close(measMasterIn);
179     gras_socket_close(measIn);
180     RETHROW0("Unable to conduct the experiment: %s");
181   }
182
183   *sec = gras_os_time() - *sec;
184   *bw = ((double)exp_size) / *sec;
185
186   free(request_ack);
187   free(request);
188   if (measIn != measMasterIn)
189     gras_socket_close(measIn);
190   gras_socket_close(measMasterIn);
191   gras_socket_close(measOut);
192 }
193
194
195 /* Callback to the "BW handshake" message: 
196    opens a server measurement socket,
197    indicate its port in an "BW handshaked" message,
198    receive the corresponding data on the measurement socket, 
199    close the measurment socket
200
201    sizes are in byte
202 */
203 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
204                             void          *payload) {
205   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
206   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
207   bw_request_t request=*(bw_request_t*)payload;
208   bw_request_t answer;
209   xbt_ex_t e;
210   int port;
211   
212   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
213         gras_socket_peer_name(expeditor),request->host.port,
214         request->buf_size,request->exp_size,request->msg_size);     
215
216   /* Build our answer */
217   answer = xbt_new0(s_bw_request_t,1);
218   
219   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
220     TRY {
221       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
222     } CATCH(e) {
223       measMasterIn = NULL;
224       if (port < 10000)
225         xbt_ex_free(e);
226       else
227         /* FIXME: tell error to remote */
228         RETHROW0("Error encountered while opening a measurement server socket: %s");
229     }
230   }
231    
232   answer->buf_size=request->buf_size;
233   answer->exp_size=request->exp_size;
234   answer->msg_size=request->msg_size;
235   answer->host.port=gras_socket_my_port(measMasterIn);
236
237
238
239   TRY {
240     gras_msg_rpcreturn(60,ctx,&answer);
241   } CATCH(e) { 
242     gras_socket_close(measMasterIn);
243     /* FIXME: tell error to remote */
244     RETHROW0("Error encountered while sending the answer: %s");
245   }
246
247
248   /* Don't connect asap to leave time to other side to enter the accept() */
249   TRY {
250     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
251                                      request->host.port,
252                                      request->buf_size,1);
253   } CATCH(e) {
254     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
255              gras_socket_peer_name(expeditor),request->host.port);
256     /* FIXME: tell error to remote */
257   }
258
259   TRY {
260     measIn = gras_socket_meas_accept(measMasterIn);
261     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
262            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
263
264     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
265     gras_socket_meas_send(measOut,120,1,1);
266   } CATCH(e) {
267     gras_socket_close(measMasterIn);
268     gras_socket_close(measIn);
269     gras_socket_close(measOut);
270     /* FIXME: tell error to remote ? */
271     RETHROW0("Error encountered while receiving the experiment: %s");
272   }
273
274   if (measIn != measMasterIn)
275     gras_socket_close(measMasterIn);
276   gras_socket_close(measIn);
277   gras_socket_close(measOut);
278   free(answer);
279   free(request);
280   DEBUG0("BW experiment done.");
281   return 1;
282 }
283
284 /**
285  * \brief request a bandwidth measurement between two remote hosts
286  *
287  * \arg from_name: Name of the first host 
288  * \arg from_port: port on which the first process is listening for messages
289  * \arg to_name: Name of the second host 
290  * \arg to_port: port on which the second process is listening (for messages, do not 
291  * give a measurement socket here. The needed measurement sockets will be created 
292  * automatically and negociated between the peers)
293  * \arg buf_size: Size of the socket buffer
294  * \arg exp_size: Total size of data sent across the network
295  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
296  * \arg sec: where the result (in seconds) should be stored.
297  * \arg bw: observed Bandwidth (in byte/s)
298  *
299  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
300  * This call is blocking until the end of the experiment.
301  *
302  * Results are reported in last args, and sizes are in bytes.
303  */
304 void amok_bw_request(const char* from_name,unsigned int from_port,
305                      const char* to_name,unsigned int to_port,
306                      unsigned long int buf_size,
307                      unsigned long int exp_size,
308                      unsigned long int msg_size,
309              /*OUT*/ double *sec, double*bw) {
310   
311   gras_socket_t sock;
312   /* The request */
313   bw_request_t request;
314   bw_res_t result;
315
316   request=xbt_new0(s_bw_request_t,1);
317   request->buf_size=buf_size;
318   request->exp_size=exp_size;
319   request->msg_size=msg_size;
320
321   request->host.name = (char*)to_name;
322   request->host.port = to_port;
323
324   sock = gras_socket_client(from_name,from_port);
325   gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
326   
327   *sec=result->sec;
328   *bw =result->bw;
329
330   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
331         from_name,from_port, to_name,to_port,
332         *sec,*bw);
333
334   gras_socket_close(sock);
335   free(result);
336 }
337
338 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
339                           void            *payload) {
340                           
341   /* specification of the test to run, and our answer */
342   bw_request_t request = *(bw_request_t*)payload;
343   bw_res_t result = xbt_new0(s_bw_res,1);
344   gras_socket_t peer;
345
346   peer = gras_socket_client(request->host.name,request->host.port);
347   amok_bw_test(peer,
348                request->buf_size,request->exp_size,request->msg_size,
349                &(result->sec),&(result->bw));
350
351   gras_msg_rpcreturn(240,ctx,&result);
352
353   gras_os_sleep(1);
354   gras_socket_close(peer);
355   free(request);
356   free(result);
357   
358   return 1;
359 }
360
361 int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx,
362                          void             *payload) {
363    CRITICAL0("amok_bw_cb_sat_start; not implemented");
364    return 1;
365
366 int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx,
367                          void             *payload) {
368    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
369    return 1;
370 }