Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Welcome to modernity, testing infrastructure
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
17
18 static short _amok_bw_initialized = 0;
19
20 /** @brief module initialization; all participating nodes must run this */
21 void amok_bw_init(void) {
22   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23
24   amok_base_init();
25
26   if (! _amok_bw_initialized) {
27         
28      /* Build the Bandwidth datatype descriptions */ 
29      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
30      gras_datadesc_struct_append(bw_request_desc,"host",
31                                  gras_datadesc_by_name("xbt_host_t"));
32      gras_datadesc_struct_append(bw_request_desc,"buf_size",
33                                  gras_datadesc_by_name("unsigned long int"));
34      gras_datadesc_struct_append(bw_request_desc,"exp_size",
35                                  gras_datadesc_by_name("unsigned long int"));
36      gras_datadesc_struct_append(bw_request_desc,"msg_size",
37                                  gras_datadesc_by_name("unsigned long int"));
38      gras_datadesc_struct_close(bw_request_desc);
39      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
40
41      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
42      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
43      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
44      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
45      gras_datadesc_struct_close(bw_res_desc);
46      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
47
48      gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
49      gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
50
51      /* Build the saturation datatype descriptions */ 
52      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
53      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
54      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
55      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
56      gras_datadesc_struct_close(sat_request_desc);
57      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", NULL);
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   NULL);
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   NULL);
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", NULL);
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 /** @brief module finalization */
85 void amok_bw_exit(void) {
86   if (! _amok_bw_initialized)
87     return;
88    
89   gras_cb_unregister(gras_msgtype_by_name("BW request"),
90                      &amok_bw_cb_bw_request);
91   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
92                      &amok_bw_cb_bw_handshake);
93
94   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
95                      &amok_bw_cb_sat_start);
96   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
97                      &amok_bw_cb_sat_begin);
98
99   _amok_bw_initialized = 0;
100 }
101
102 /* ***************************************************************************
103  * Bandwidth tests
104  * ***************************************************************************/
105
106 /**
107  * \brief bandwidth measurement between localhost and \e peer
108  * 
109  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
110  * \arg buf_size: Size of the socket buffer
111  * \arg exp_size: Total size of data sent across the network
112  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
113  * \arg sec: where the result (in seconds) should be stored.
114  * \arg bw: observed Bandwidth (in byte/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * Results are reported in last args, and sizes are in byte.
120  */
121 void amok_bw_test(gras_socket_t peer,
122                   unsigned long int buf_size,
123                   unsigned long int exp_size,
124                   unsigned long int msg_size,
125           /*OUT*/ double *sec, double *bw) {
126
127   /* Measurement sockets for the experiments */
128   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
129   int port;
130   bw_request_t request,request_ack;
131   xbt_ex_t e;
132   
133   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
134     TRY {
135       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
136     } CATCH(e) {
137       measMasterIn = NULL;
138       if (port == 10000 -1) {
139         RETHROW0("Error caught while opening a measurement socket: %s");
140       } else {
141         xbt_ex_free(e); 
142       }
143     }
144   }
145   
146   request=xbt_new0(s_bw_request_t,1);
147   request->buf_size=buf_size;
148   request->exp_size=exp_size;
149   request->msg_size=msg_size;
150   request->host.name = NULL;
151   request->host.port = gras_socket_my_port(measMasterIn);
152   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
153         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
154         buf_size,request->buf_size);
155
156   TRY {
157     gras_msg_rpccall(peer,60,
158                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
159   } CATCH(e) {
160     RETHROW0("Error encountered while sending the BW request: %s");
161   }
162   measIn = gras_socket_meas_accept(measMasterIn);
163    
164   TRY {
165     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
166                                    request_ack->host.port, 
167                                    request->buf_size,1);
168   } CATCH(e) {
169     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
170              gras_socket_peer_name(peer),request_ack->host.port);
171   }
172   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
173
174   *sec=gras_os_time();
175   TRY {
176     gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
177     gras_socket_meas_recv(measIn,120,1,1);
178   } CATCH(e) {
179     gras_socket_close(measOut);
180     gras_socket_close(measMasterIn);
181     gras_socket_close(measIn);
182     RETHROW0("Unable to conduct the experiment: %s");
183   }
184
185   *sec = gras_os_time() - *sec;
186   *bw = ((double)exp_size) / *sec;
187
188   free(request_ack);
189   free(request);
190   if (measIn != measMasterIn)
191     gras_socket_close(measIn);
192   gras_socket_close(measMasterIn);
193   gras_socket_close(measOut);
194 }
195
196
197 /* Callback to the "BW handshake" message: 
198    opens a server measurement socket,
199    indicate its port in an "BW handshaked" message,
200    receive the corresponding data on the measurement socket, 
201    close the measurment socket
202
203    sizes are in byte
204 */
205 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
206                             void          *payload) {
207   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
208   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
209   bw_request_t request=*(bw_request_t*)payload;
210   bw_request_t answer;
211   xbt_ex_t e;
212   int port;
213   
214   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
215         gras_socket_peer_name(expeditor),request->host.port,
216         request->buf_size,request->exp_size,request->msg_size);     
217
218   /* Build our answer */
219   answer = xbt_new0(s_bw_request_t,1);
220   
221   for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
222     TRY {
223       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
224     } CATCH(e) {
225       measMasterIn = NULL;
226       if (port < 10000)
227         xbt_ex_free(e);
228       else
229         /* FIXME: tell error to remote */
230         RETHROW0("Error encountered while opening a measurement server socket: %s");
231     }
232   }
233    
234   answer->buf_size=request->buf_size;
235   answer->exp_size=request->exp_size;
236   answer->msg_size=request->msg_size;
237   answer->host.port=gras_socket_my_port(measMasterIn);
238
239
240
241   TRY {
242     gras_msg_rpcreturn(60,ctx,&answer);
243   } CATCH(e) { 
244     gras_socket_close(measMasterIn);
245     /* FIXME: tell error to remote */
246     RETHROW0("Error encountered while sending the answer: %s");
247   }
248
249
250   /* Don't connect asap to leave time to other side to enter the accept() */
251   TRY {
252     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
253                                      request->host.port,
254                                      request->buf_size,1);
255   } CATCH(e) {
256     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
257              gras_socket_peer_name(expeditor),request->host.port);
258     /* FIXME: tell error to remote */
259   }
260
261   TRY {
262     measIn = gras_socket_meas_accept(measMasterIn);
263     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
264            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
265
266     gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
267     gras_socket_meas_send(measOut,120,1,1);
268   } CATCH(e) {
269     gras_socket_close(measMasterIn);
270     gras_socket_close(measIn);
271     gras_socket_close(measOut);
272     /* FIXME: tell error to remote ? */
273     RETHROW0("Error encountered while receiving the experiment: %s");
274   }
275
276   if (measIn != measMasterIn)
277     gras_socket_close(measMasterIn);
278   gras_socket_close(measIn);
279   gras_socket_close(measOut);
280   free(answer);
281   free(request);
282   DEBUG0("BW experiment done.");
283   return 1;
284 }
285
286 /**
287  * \brief request a bandwidth measurement between two remote hosts
288  *
289  * \arg from_name: Name of the first host 
290  * \arg from_port: port on which the first process is listening for messages
291  * \arg to_name: Name of the second host 
292  * \arg to_port: port on which the second process is listening (for messages, do not 
293  * give a measurement socket here. The needed measurement sockets will be created 
294  * automatically and negociated between the peers)
295  * \arg buf_size: Size of the socket buffer
296  * \arg exp_size: Total size of data sent across the network
297  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
298  * \arg sec: where the result (in seconds) should be stored.
299  * \arg bw: observed Bandwidth (in byte/s)
300  *
301  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
302  * This call is blocking until the end of the experiment.
303  *
304  * Results are reported in last args, and sizes are in bytes.
305  */
306 void amok_bw_request(const char* from_name,unsigned int from_port,
307                      const char* to_name,unsigned int to_port,
308                      unsigned long int buf_size,
309                      unsigned long int exp_size,
310                      unsigned long int msg_size,
311              /*OUT*/ double *sec, double*bw) {
312   
313   gras_socket_t sock;
314   /* The request */
315   bw_request_t request;
316   bw_res_t result;
317
318   request=xbt_new0(s_bw_request_t,1);
319   request->buf_size=buf_size;
320   request->exp_size=exp_size;
321   request->msg_size=msg_size;
322
323   request->host.name = (char*)to_name;
324   request->host.port = to_port;
325
326   sock = gras_socket_client(from_name,from_port);
327   gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
328   
329   *sec=result->sec;
330   *bw =result->bw;
331
332   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
333         from_name,from_port, to_name,to_port,
334         *sec,*bw);
335
336   gras_socket_close(sock);
337   free(result);
338 }
339
340 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
341                           void            *payload) {
342                           
343   /* specification of the test to run, and our answer */
344   bw_request_t request = *(bw_request_t*)payload;
345   bw_res_t result = xbt_new0(s_bw_res,1);
346   gras_socket_t peer;
347
348   peer = gras_socket_client(request->host.name,request->host.port);
349   amok_bw_test(peer,
350                request->buf_size,request->exp_size,request->msg_size,
351                &(result->sec),&(result->bw));
352
353   gras_msg_rpcreturn(240,ctx,&result);
354
355   gras_os_sleep(1);
356   gras_socket_close(peer);
357   free(request);
358   free(result);
359   
360   return 1;
361 }
362
363 int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx,
364                          void             *payload) {
365    CRITICAL0("amok_bw_cb_sat_start; not implemented");
366    return 1;
367
368 int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx,
369                          void             *payload) {
370    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
371    return 1;
372 }
373
374 double * amok_bw_matrix(xbt_dynar_t hosts,
375                          int buf_size_bw, int exp_size_bw, int msg_size_bw) { 
376   double sec;
377   /* construct of matrixs for bandwith and Latency */
378
379
380   double *matrix_bw;   /* matrix bandwidth */
381   int i,j,len=xbt_dynar_length(hosts);
382
383   matrix_bw = (double *) malloc(sizeof(double)* len*len);
384
385   xbt_host_t h1,h2;
386
387   h1 = xbt_new(s_xbt_host_t,1);
388   h2 = xbt_new(s_xbt_host_t,1);
389   i=0;
390   xbt_dynar_foreach (hosts,i,h1) {
391     j=0;
392     xbt_dynar_foreach (hosts,j,h2) {
393       if(i!=j) {
394         /* Mesurements of Bandwidth */
395         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
396                         buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_bw[i*len + j]);
397       } else {
398         matrix_bw[i*len +j] = 0.0;
399       }
400
401     }
402   }
403   free(h1);
404   free(h2);
405   return matrix_bw;
406 }