Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Do not force experiment sizes to be expressed in kb, or it becomes impossible to...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   amok_base_init();
23
24   if (! _amok_bw_initialized) {
25         
26      /* Build the datatype descriptions */ 
27      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28      gras_datadesc_struct_append(bw_request_desc,"host",
29                                  gras_datadesc_by_name("xbt_host_t"));
30      gras_datadesc_struct_append(bw_request_desc,"buf_size",
31                                  gras_datadesc_by_name("unsigned long int"));
32      gras_datadesc_struct_append(bw_request_desc,"exp_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"msg_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_close(bw_request_desc);
37      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38
39      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW handshake",     bw_request_desc);
55      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
56      gras_msgtype_declare("BW request",       bw_request_desc);
57      gras_msgtype_declare("BW result",        bw_res_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", NULL);
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   NULL);
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   NULL);
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", NULL);
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 /** @brief module finalization */
85 void amok_bw_exit(void) {
86   if (! _amok_bw_initialized)
87     return;
88    
89   gras_cb_unregister(gras_msgtype_by_name("BW request"),
90                      &amok_bw_cb_bw_request);
91   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
92                      &amok_bw_cb_bw_handshake);
93
94   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
95                      &amok_bw_cb_sat_start);
96   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
97                      &amok_bw_cb_sat_begin);
98
99   _amok_bw_initialized = 0;
100 }
101
102 /* ***************************************************************************
103  * Bandwidth tests
104  * ***************************************************************************/
105
106 /**
107  * \brief bandwidth measurement between localhost and \e peer
108  * 
109  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
110  * \arg buf_size: Size of the socket buffer
111  * \arg exp_size: Total size of data sent across the network
112  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
113  * \arg sec: where the result (in seconds) should be stored.
114  * \arg bw: observed Bandwidth (in byte/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * Results are reported in last args, and sizes are in byte.
120  */
121 void amok_bw_test(gras_socket_t peer,
122                   unsigned long int buf_size,
123                   unsigned long int exp_size,
124                   unsigned long int msg_size,
125           /*OUT*/ double *sec, double *bw) {
126
127   /* Measurement sockets for the experiments */
128   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
129   int port;
130   bw_request_t request,request_ack;
131   xbt_ex_t e;
132   
133   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
134     TRY {
135       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
136     } CATCH(e) {
137       measMasterIn = NULL;
138       if (port == 10000 -1) {
139         RETHROW0("Error caught while opening a measurement socket: %s");
140       } else {
141         xbt_ex_free(e); 
142       }
143     }
144   }
145   
146   request=xbt_new0(s_bw_request_t,1);
147   request->buf_size=buf_size;
148   request->exp_size=exp_size;
149   request->msg_size=msg_size;
150   request->host.name = NULL;
151   request->host.port = gras_socket_my_port(measMasterIn);
152   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
153         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
154         buf_size,request->buf_size);
155
156   TRY {
157     gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162
163   TRY {
164     gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
165   } CATCH(e) {
166     RETHROW0("Error encountered while waiting for the answer to BW request: %s");
167   }
168   
169   /* FIXME: What if there is a remote error? */
170    
171   TRY {
172     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
173                                    request_ack->host.port, 
174                                    request->buf_size,1);
175   } CATCH(e) {
176     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
177              gras_socket_peer_name(peer),request_ack->host.port);
178   }
179   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
180
181   *sec=gras_os_time();
182   TRY {
183     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
184     gras_socket_meas_recv(measIn,120,1,1);
185   } CATCH(e) {
186     gras_socket_close(measOut);
187     gras_socket_close(measMasterIn);
188     gras_socket_close(measIn);
189     RETHROW0("Unable to conduct the experiment: %s");
190   }
191
192   *sec = gras_os_time() - *sec;
193   *bw = ((double)exp_size) / *sec;
194
195   free(request_ack);
196   free(request);
197   if (measIn != measMasterIn)
198     gras_socket_close(measIn);
199   gras_socket_close(measMasterIn);
200   gras_socket_close(measOut);
201 }
202
203
204 /* Callback to the "BW handshake" message: 
205    opens a server measurement socket,
206    indicate its port in an "BW handshaked" message,
207    receive the corresponding data on the measurement socket, 
208    close the measurment socket
209
210    sizes are in byte
211 */
212 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
213                             void          *payload) {
214   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
215   bw_request_t request=*(bw_request_t*)payload;
216   bw_request_t answer;
217   xbt_ex_t e;
218   int port;
219   
220   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
221         gras_socket_peer_name(expeditor),request->host.port,
222         request->buf_size,request->exp_size,request->msg_size);     
223
224   /* Build our answer */
225   answer = xbt_new0(s_bw_request_t,1);
226   
227   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
228     TRY {
229       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
230     } CATCH(e) {
231       measMasterIn = NULL;
232       if (port < 10000)
233         xbt_ex_free(e);
234       else
235         /* FIXME: tell error to remote */
236         RETHROW0("Error encountered while opening a measurement server socket: %s");
237     }
238   }
239    
240   answer->buf_size=request->buf_size;
241   answer->exp_size=request->exp_size;
242   answer->msg_size=request->msg_size;
243   answer->host.port=gras_socket_my_port(measMasterIn);
244
245   /* Don't connect asap to leave time to other side to enter the accept() */
246   TRY {
247     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
248                                      request->host.port,
249                                      request->buf_size,1);
250   } CATCH(e) {
251     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
252              gras_socket_peer_name(expeditor),request->host.port);
253     /* FIXME: tell error to remote */
254   }
255
256   TRY {
257     gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
258   } CATCH(e) { 
259     gras_socket_close(measMasterIn);
260     gras_socket_close(measOut);
261     /* FIXME: tell error to remote */
262     RETHROW0("Error encountered while sending the answer: %s");
263   }
264
265   TRY {
266     measIn = gras_socket_meas_accept(measMasterIn);
267     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
268            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
269
270     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
271     gras_socket_meas_send(measOut,120,1,1);
272   } CATCH(e) {
273     gras_socket_close(measMasterIn);
274     gras_socket_close(measIn);
275     gras_socket_close(measOut);
276     /* FIXME: tell error to remote ? */
277     RETHROW0("Error encountered while receiving the experiment: %s");
278   }
279
280   if (measIn != measMasterIn)
281     gras_socket_close(measMasterIn);
282   gras_socket_close(measIn);
283   gras_socket_close(measOut);
284   free(answer);
285   free(request);
286   DEBUG0("BW experiment done.");
287   return 1;
288 }
289
290 /**
291  * \brief request a bandwidth measurement between two remote hosts
292  *
293  * \arg from_name: Name of the first host 
294  * \arg from_port: port on which the first process is listening for messages
295  * \arg to_name: Name of the second host 
296  * \arg to_port: port on which the second process is listening (for messages, do not 
297  * give a measurement socket here. The needed measurement sockets will be created 
298  * automatically and negociated between the peers)
299  * \arg buf_size: Size of the socket buffer
300  * \arg exp_size: Total size of data sent across the network
301  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
302  * \arg sec: where the result (in seconds) should be stored.
303  * \arg bw: observed Bandwidth (in byte/s)
304  *
305  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
306  * This call is blocking until the end of the experiment.
307  *
308  * Results are reported in last args, and sizes are in bytes.
309  */
310 void amok_bw_request(const char* from_name,unsigned int from_port,
311                      const char* to_name,unsigned int to_port,
312                      unsigned long int buf_size,
313                      unsigned long int exp_size,
314                      unsigned long int msg_size,
315              /*OUT*/ double *sec, double*bw) {
316   
317   gras_socket_t sock;
318   /* The request */
319   bw_request_t request;
320   bw_res_t result;
321
322   request=xbt_new0(s_bw_request_t,1);
323   request->buf_size=buf_size;
324   request->exp_size=exp_size;
325   request->msg_size=msg_size;
326
327   request->host.name = (char*)to_name;
328   request->host.port = to_port;
329
330   sock = gras_socket_client(from_name,from_port);
331   gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
332   free(request);
333
334   gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
335   
336   *sec=result->sec;
337   *bw =result->bw;
338
339   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
340         from_name,from_port, to_name,to_port,
341         *sec,*bw);
342
343   gras_socket_close(sock);
344   free(result);
345 }
346
347 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
348                           void            *payload) {
349                           
350   /* specification of the test to run, and our answer */
351   bw_request_t request = *(bw_request_t*)payload;
352   bw_res_t result = xbt_new0(s_bw_res,1);
353   gras_socket_t peer;
354
355   peer = gras_socket_client(request->host.name,request->host.port);
356   amok_bw_test(peer,
357                request->buf_size,request->exp_size,request->msg_size,
358                &(result->sec),&(result->bw));
359
360   gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
361
362   gras_os_sleep(1);
363   gras_socket_close(peer);
364   free(request);
365   free(result);
366   
367   return 1;
368 }
369
370 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
371                          void             *payload) {
372    CRITICAL0("amok_bw_cb_sat_start; not implemented");
373    return 1;
374
375 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
376                          void             *payload) {
377    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
378    return 1;
379 }