Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sizes are now in bytes in GRAS & AMOK (plus 'the thing we live for is debuging')
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/ex.h"
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   amok_base_init();
23
24   if (! _amok_bw_initialized) {
25         
26      /* Build the datatype descriptions */ 
27      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28      gras_datadesc_struct_append(bw_request_desc,"host",
29                                  gras_datadesc_by_name("xbt_host_t"));
30      gras_datadesc_struct_append(bw_request_desc,"buf_size",
31                                  gras_datadesc_by_name("unsigned long int"));
32      gras_datadesc_struct_append(bw_request_desc,"exp_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"msg_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_close(bw_request_desc);
37      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38
39      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW handshake",     bw_request_desc);
55      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
56      gras_msgtype_declare("BW request",       bw_request_desc);
57      gras_msgtype_declare("BW result",        bw_res_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", NULL);
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   NULL);
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   NULL);
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", NULL);
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 /** @brief module finalization */
85 void amok_bw_exit(void) {
86   if (! _amok_bw_initialized)
87     return;
88    
89   gras_cb_unregister(gras_msgtype_by_name("BW request"),
90                      &amok_bw_cb_bw_request);
91   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
92                      &amok_bw_cb_bw_handshake);
93
94   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
95                      &amok_bw_cb_sat_start);
96   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
97                      &amok_bw_cb_sat_begin);
98
99   _amok_bw_initialized = 0;
100 }
101
102 /* ***************************************************************************
103  * Bandwidth tests
104  * ***************************************************************************/
105
106 /**
107  * \brief bandwidth measurement between localhost and \e peer
108  * 
109  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
110  * \arg buf_size: Size of the socket buffer
111  * \arg exp_size: Total size of data sent across the network
112  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
113  * \arg sec: where the result (in seconds) should be stored.
114  * \arg bw: observed Bandwidth (in byte/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * Results are reported in last args, and sizes are in byte.
120  */
121 void amok_bw_test(gras_socket_t peer,
122                   unsigned long int buf_size,
123                   unsigned long int exp_size,
124                   unsigned long int msg_size,
125           /*OUT*/ double *sec, double *bw) {
126
127   /* Measurement sockets for the experiments */
128   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
129   int port;
130   bw_request_t request,request_ack;
131   xbt_ex_t e;
132   
133   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
134     TRY {
135       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
136     } CATCH(e) {
137       measMasterIn = NULL;
138       if (port == 10000 -1) {
139         RETHROW0("Error caught while opening a measurement socket: %s");
140       } else {
141         xbt_ex_free(e); 
142       }
143     }
144   }
145   
146   request=xbt_new0(s_bw_request_t,1);
147   request->buf_size=buf_size;
148   request->exp_size=exp_size;
149   request->msg_size=msg_size;
150   request->host.name = NULL;
151   request->host.port = gras_socket_my_port(measMasterIn);
152   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
153         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
154         buf_size,request->buf_size);
155
156   TRY {
157     gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162
163   TRY {
164     gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
165   } CATCH(e) {
166     RETHROW0("Error encountered while waiting for the answer to BW request: %s");
167   }
168   
169   /* FIXME: What if there is a remote error? */
170    
171   TRY {
172     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
173                                    request_ack->host.port, 
174                                    request->buf_size,1);
175   } CATCH(e) {
176     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
177              gras_socket_peer_name(peer),request_ack->host.port);
178   }
179   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
180
181   *sec=gras_os_time();
182   TRY {
183     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
184     gras_socket_meas_recv(measIn,120,1,1);
185   } CATCH(e) {
186     gras_socket_close(measOut);
187     gras_socket_close(measMasterIn);
188     gras_socket_close(measIn);
189     RETHROW0("Unable to conduct the experiment: %s");
190   }
191
192   *sec = gras_os_time() - *sec;
193   *bw = ((double)exp_size) / *sec;
194
195   free(request_ack);
196   free(request);
197   if (measIn != measMasterIn)
198     gras_socket_close(measIn);
199   gras_socket_close(measMasterIn);
200   gras_socket_close(measOut);
201 }
202
203
204 /* Callback to the "BW handshake" message: 
205    opens a server measurement socket,
206    indicate its port in an "BW handshaked" message,
207    receive the corresponding data on the measurement socket, 
208    close the measurment socket
209
210    sizes are in byte
211 */
212 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
213                             void          *payload) {
214   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
215   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
216   bw_request_t request=*(bw_request_t*)payload;
217   bw_request_t answer;
218   xbt_ex_t e;
219   int port;
220   
221   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
222         gras_socket_peer_name(expeditor),request->host.port,
223         request->buf_size,request->exp_size,request->msg_size);     
224
225   /* Build our answer */
226   answer = xbt_new0(s_bw_request_t,1);
227   
228   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
229     TRY {
230       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
231     } CATCH(e) {
232       measMasterIn = NULL;
233       if (port < 10000)
234         xbt_ex_free(e);
235       else
236         /* FIXME: tell error to remote */
237         RETHROW0("Error encountered while opening a measurement server socket: %s");
238     }
239   }
240    
241   answer->buf_size=request->buf_size;
242   answer->exp_size=request->exp_size;
243   answer->msg_size=request->msg_size;
244   answer->host.port=gras_socket_my_port(measMasterIn);
245
246   /* Don't connect asap to leave time to other side to enter the accept() */
247   TRY {
248     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
249                                      request->host.port,
250                                      request->buf_size,1);
251   } CATCH(e) {
252     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
253              gras_socket_peer_name(expeditor),request->host.port);
254     /* FIXME: tell error to remote */
255   }
256
257   TRY {
258     gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
259   } CATCH(e) { 
260     gras_socket_close(measMasterIn);
261     gras_socket_close(measOut);
262     /* FIXME: tell error to remote */
263     RETHROW0("Error encountered while sending the answer: %s");
264   }
265
266   TRY {
267     measIn = gras_socket_meas_accept(measMasterIn);
268     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
269            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
270
271     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
272     gras_socket_meas_send(measOut,120,1,1);
273   } CATCH(e) {
274     gras_socket_close(measMasterIn);
275     gras_socket_close(measIn);
276     gras_socket_close(measOut);
277     /* FIXME: tell error to remote ? */
278     RETHROW0("Error encountered while receiving the experiment: %s");
279   }
280
281   if (measIn != measMasterIn)
282     gras_socket_close(measMasterIn);
283   gras_socket_close(measIn);
284   gras_socket_close(measOut);
285   free(answer);
286   free(request);
287   DEBUG0("BW experiment done.");
288   return 1;
289 }
290
291 /**
292  * \brief request a bandwidth measurement between two remote hosts
293  *
294  * \arg from_name: Name of the first host 
295  * \arg from_port: port on which the first process is listening for messages
296  * \arg to_name: Name of the second host 
297  * \arg to_port: port on which the second process is listening (for messages, do not 
298  * give a measurement socket here. The needed measurement sockets will be created 
299  * automatically and negociated between the peers)
300  * \arg buf_size: Size of the socket buffer
301  * \arg exp_size: Total size of data sent across the network
302  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
303  * \arg sec: where the result (in seconds) should be stored.
304  * \arg bw: observed Bandwidth (in byte/s)
305  *
306  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
307  * This call is blocking until the end of the experiment.
308  *
309  * Results are reported in last args, and sizes are in bytes.
310  */
311 void amok_bw_request(const char* from_name,unsigned int from_port,
312                      const char* to_name,unsigned int to_port,
313                      unsigned long int buf_size,
314                      unsigned long int exp_size,
315                      unsigned long int msg_size,
316              /*OUT*/ double *sec, double*bw) {
317   
318   gras_socket_t sock;
319   /* The request */
320   bw_request_t request;
321   bw_res_t result;
322
323   request=xbt_new0(s_bw_request_t,1);
324   request->buf_size=buf_size;
325   request->exp_size=exp_size;
326   request->msg_size=msg_size;
327
328   request->host.name = (char*)to_name;
329   request->host.port = to_port;
330
331   sock = gras_socket_client(from_name,from_port);
332   gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
333   free(request);
334
335   gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
336   
337   *sec=result->sec;
338   *bw =result->bw;
339
340   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
341         from_name,from_port, to_name,to_port,
342         *sec,*bw);
343
344   gras_socket_close(sock);
345   free(result);
346 }
347
348 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
349                           void            *payload) {
350                           
351   gras_socket_t    expeditor = gras_msg_cb_ctx_from(ctx);
352   /* specification of the test to run, and our answer */
353   bw_request_t request = *(bw_request_t*)payload;
354   bw_res_t result = xbt_new0(s_bw_res,1);
355   gras_socket_t peer;
356
357   peer = gras_socket_client(request->host.name,request->host.port);
358   amok_bw_test(peer,
359                request->buf_size,request->exp_size,request->msg_size,
360                &(result->sec),&(result->bw));
361
362   gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
363
364   gras_os_sleep(1);
365   gras_socket_close(peer);
366   free(request);
367   free(result);
368   
369   return 1;
370 }
371
372 int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx,
373                          void             *payload) {
374    CRITICAL0("amok_bw_cb_sat_start; not implemented");
375    return 1;
376
377 int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx,
378                          void             *payload) {
379    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
380    return 1;
381 }