Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Increase an hard-coded timeout since it gave me a bunch of headheaches in the simulations
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123                   double min_duration,
124           /*OUT*/ double *sec, double *bw) {
125
126   /* Measurement sockets for the experiments */
127   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
128   int port;
129   bw_request_t request,request_ack;
130   xbt_ex_t e;
131   
132   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
133     TRY {
134       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
135     } CATCH(e) {
136       measMasterIn = NULL;
137       if (port == 10000 -1) {
138         RETHROW0("Error caught while opening a measurement socket: %s");
139       } else {
140         xbt_ex_free(e); 
141       }
142     }
143   }
144   
145   request=xbt_new0(s_bw_request_t,1);
146   request->buf_size=buf_size;
147   request->exp_size=exp_size;
148   request->msg_size=msg_size;
149   request->host.name = NULL;
150   request->host.port = gras_socket_my_port(measMasterIn);
151   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
152         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153         buf_size,request->buf_size);
154
155   TRY {
156     gras_msg_rpccall(peer,60,
157                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162    
163   TRY {
164     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165                                    request_ack->host.port, 
166                                    request->buf_size,1);
167   } CATCH(e) {
168     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169              gras_socket_peer_name(peer),request_ack->host.port);
170   }
171   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
172
173   *sec = 0;
174   do {
175     if (*sec>0) {
176       double meas_duration=*sec;
177       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
178
179       DEBUG4("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld (got %fkb/s)",
180              meas_duration,min_duration,request->exp_size,((double)exp_size) / *sec/1024);
181       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
182     }
183
184     *sec=gras_os_time();
185     TRY {
186       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
187       DEBUG0("Data sent. Wait ACK");
188       gras_socket_meas_recv(measIn,120,1,1);
189     } CATCH(e) {
190       gras_socket_close(measOut);
191       gras_socket_close(measMasterIn);
192       gras_socket_close(measIn);
193       RETHROW0("Unable to conduct the experiment: %s");
194     }
195     DEBUG0("Experiment done");
196
197     *sec = gras_os_time() - *sec;
198     *bw = ((double)exp_size) / *sec;
199   } while (*sec < min_duration);
200
201   DEBUG0("This measurement was long enough. Stop peer");
202   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
203
204   free(request_ack);
205   free(request);
206   if (measIn != measMasterIn)
207     gras_socket_close(measIn);
208   gras_socket_close(measMasterIn);
209   gras_socket_close(measOut);
210 }
211
212
213 /* Callback to the "BW handshake" message: 
214    opens a server measurement socket,
215    indicate its port in an "BW handshaked" message,
216    receive the corresponding data on the measurement socket, 
217    close the measurment socket
218
219    sizes are in byte
220 */
221 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
222                             void          *payload) {
223   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
224   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
225   bw_request_t request=*(bw_request_t*)payload;
226   bw_request_t answer;
227   xbt_ex_t e;
228   int port;
229   int tooshort = 1;
230   gras_msg_cb_ctx_t ctx_reask;
231   static xbt_dynar_t msgtwaited=NULL;
232   
233   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
234         gras_socket_peer_name(expeditor),request->host.port,
235         request->buf_size,request->exp_size,request->msg_size);     
236
237   /* Build our answer */
238   answer = xbt_new0(s_bw_request_t,1);
239   
240   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
241     TRY {
242       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
243     } CATCH(e) {
244       measMasterIn = NULL;
245       if (port < 10000)
246         xbt_ex_free(e);
247       else
248         /* FIXME: tell error to remote */
249         RETHROW0("Error encountered while opening a measurement server socket: %s");
250     }
251   }
252    
253   answer->buf_size=request->buf_size;
254   answer->exp_size=request->exp_size;
255   answer->msg_size=request->msg_size;
256   answer->host.port=gras_socket_my_port(measMasterIn);
257
258   TRY {
259     gras_msg_rpcreturn(60,ctx,&answer);
260   } CATCH(e) { 
261     gras_socket_close(measMasterIn);
262     /* FIXME: tell error to remote */
263     RETHROW0("Error encountered while sending the answer: %s");
264   }
265
266
267   /* Don't connect asap to leave time to other side to enter the accept() */
268   TRY {
269     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
270                                      request->host.port,
271                                      request->buf_size,1);
272   } CATCH(e) {
273     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
274              gras_socket_peer_name(expeditor),request->host.port);
275     /* FIXME: tell error to remote */
276   }
277
278   TRY {
279     measIn = gras_socket_meas_accept(measMasterIn);
280     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
281            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
282   } CATCH(e) {
283     gras_socket_close(measMasterIn);
284     gras_socket_close(measIn);
285     gras_socket_close(measOut);
286     /* FIXME: tell error to remote ? */
287     RETHROW0("Error encountered while opening the meas socket: %s");
288   }
289
290   if (!msgtwaited) {
291     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
292     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
293     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
294   }
295
296   while (tooshort) {
297     void *payload;
298     int msggot;
299     TRY {
300       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
301       gras_socket_meas_send(measOut,120,1,1);
302       DEBUG0("ACK sent");
303     } CATCH(e) {
304       gras_socket_close(measMasterIn);
305       gras_socket_close(measIn);
306       gras_socket_close(measOut);
307       /* FIXME: tell error to remote ? */
308       RETHROW0("Error encountered while receiving the experiment: %s");
309     }
310     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
311     switch(msggot) {
312     case 0: /* BW stop */
313       tooshort = 0;
314       break;
315     case 1: /* BW reask */
316       tooshort = 1;
317       free(request);
318       request = (bw_request_t)payload;
319       VERB0("Return the reasking RPC");
320       gras_msg_rpcreturn(60,ctx_reask,NULL);
321     }
322     gras_msg_cb_ctx_free(ctx_reask);
323   }
324
325   if (measIn != measMasterIn)
326     gras_socket_close(measMasterIn);
327   gras_socket_close(measIn);
328   gras_socket_close(measOut);
329   free(answer);
330   free(request);
331   VERB0("BW experiment done.");
332   return 1;
333 }
334
335 /**
336  * \brief request a bandwidth measurement between two remote hosts
337  *
338  * \arg from_name: Name of the first host 
339  * \arg from_port: port on which the first process is listening for messages
340  * \arg to_name: Name of the second host 
341  * \arg to_port: port on which the second process is listening (for messages, do not 
342  * give a measurement socket here. The needed measurement sockets will be created 
343  * automatically and negociated between the peers)
344  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
345  * \arg exp_size: Total size of data sent across the network
346  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
347  * \arg sec: where the result (in seconds) should be stored.
348  * \arg bw: observed Bandwidth (in byte/s)
349  *
350  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
351  * This call is blocking until the end of the experiment.
352  *
353  * Results are reported in last args, and sizes are in bytes.
354  */
355 void amok_bw_request(const char* from_name,unsigned int from_port,
356                      const char* to_name,unsigned int to_port,
357                      unsigned long int buf_size,
358                      unsigned long int exp_size,
359                      unsigned long int msg_size,
360                      double min_duration,
361              /*OUT*/ double *sec, double*bw) {
362   
363   gras_socket_t sock;
364   /* The request */
365   bw_request_t request;
366   bw_res_t result;
367
368   request=xbt_new0(s_bw_request_t,1);
369   request->buf_size=buf_size;
370   request->exp_size=exp_size;
371   request->msg_size=msg_size;
372   request->min_duration = min_duration;
373
374   request->host.name = (char*)to_name;
375   request->host.port = to_port;
376
377   sock = gras_socket_client(from_name,from_port);
378   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
379
380   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
381   
382   if (sec)
383     *sec=result->sec;
384   if (bw)
385     *bw =result->bw;
386
387   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
388         from_name,from_port, to_name,to_port,
389         result->sec,((double)result->bw)/1024.0);
390
391   gras_socket_close(sock);
392   free(result);
393   free(request);
394 }
395
396 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
397                           void            *payload) {
398                           
399   /* specification of the test to run, and our answer */
400   bw_request_t request = *(bw_request_t*)payload;
401   bw_res_t result = xbt_new0(s_bw_res_t,1);
402   gras_socket_t peer,asker;
403
404   asker=gras_msg_cb_ctx_from(ctx);
405   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
406         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
407         request->host.name,request->host.port);
408   peer = gras_socket_client(request->host.name,request->host.port);
409   amok_bw_test(peer,
410                request->buf_size,request->exp_size,request->msg_size,
411                request->min_duration,
412                &(result->sec),&(result->bw));
413
414   gras_msg_rpcreturn(240,ctx,&result);
415
416   gras_os_sleep(1);
417   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
418   free(request->host.name);
419   free(request);
420   free(result);
421   
422   return 1;
423 }
424
425 /** \brief builds a matrix of results of bandwidth measurement */
426 double * amok_bw_matrix(xbt_dynar_t hosts,
427                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
428                         double min_duration) { 
429   double sec;
430   /* construction of matrices for bandwith and latency */
431
432
433   int i,j,len=xbt_dynar_length(hosts);
434
435   double *matrix_res = xbt_new0(double, len*len);
436   xbt_host_t h1,h2;
437
438   xbt_dynar_foreach (hosts,i,h1) {
439     xbt_dynar_foreach (hosts,j,h2) {
440       if (i!=j) {
441         /* Mesurements of Bandwidth */
442         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
443                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
444                         &sec,&matrix_res[i*len + j]);
445       } 
446     }
447   }
448   return matrix_res;
449 }