Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
- Also adapt the message size when experiment is too short to avoid
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123                   double min_duration,
124           /*OUT*/ double *sec, double *bw) {
125
126   /* Measurement sockets for the experiments */
127   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
128   int port;
129   bw_request_t request,request_ack;
130   xbt_ex_t e;
131   
132   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
133     TRY {
134       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
135     } CATCH(e) {
136       measMasterIn = NULL;
137       if (port == 10000 -1) {
138         RETHROW0("Error caught while opening a measurement socket: %s");
139       } else {
140         xbt_ex_free(e); 
141       }
142     }
143   }
144   
145   request=xbt_new0(s_bw_request_t,1);
146   request->buf_size=buf_size;
147   request->exp_size=exp_size;
148   request->msg_size=msg_size;
149   request->host.name = NULL;
150   request->host.port = gras_socket_my_port(measMasterIn);
151   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
152         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153         buf_size,request->buf_size);
154
155   TRY {
156     gras_msg_rpccall(peer,15,
157                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162    
163   TRY {
164     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165                                    request_ack->host.port, 
166                                    request->buf_size,1);
167   } CATCH(e) {
168     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169              gras_socket_peer_name(peer),request_ack->host.port);
170   }
171   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
172
173   *sec = 0;
174   do {
175     if (*sec>0) {
176       double meas_duration=*sec;
177       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
178       request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
179
180
181       DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
182              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
183       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
184       DEBUG0("Peer is ready for another round of fun");
185     }
186
187     *sec=gras_os_time();
188     TRY {
189       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
190       DEBUG0("Data sent. Wait ACK");
191       gras_socket_meas_recv(measIn,120,1,1);
192     } CATCH(e) {
193       gras_socket_close(measOut);
194       gras_socket_close(measMasterIn);
195       gras_socket_close(measIn);
196       RETHROW0("Unable to conduct the experiment: %s");
197     }
198     DEBUG0("Experiment done");
199
200     *sec = gras_os_time() - *sec;
201     *bw = ((double)request->exp_size) / *sec;
202   } while (*sec < min_duration);
203
204   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
205          *sec,*bw);
206   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
207
208   free(request_ack);
209   free(request);
210   if (measIn != measMasterIn)
211     gras_socket_close(measIn);
212   gras_socket_close(measMasterIn);
213   gras_socket_close(measOut);
214 }
215
216
217 /* Callback to the "BW handshake" message: 
218    opens a server measurement socket,
219    indicate its port in an "BW handshaked" message,
220    receive the corresponding data on the measurement socket, 
221    close the measurment socket
222
223    sizes are in byte
224 */
225 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
226                             void          *payload) {
227   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
228   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
229   bw_request_t request=*(bw_request_t*)payload;
230   bw_request_t answer;
231   xbt_ex_t e;
232   int port;
233   int tooshort = 1;
234   gras_msg_cb_ctx_t ctx_reask;
235   static xbt_dynar_t msgtwaited=NULL;
236   
237   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
238         gras_socket_peer_name(expeditor),request->host.port,
239         request->buf_size,request->exp_size,request->msg_size);     
240
241   /* Build our answer */
242   answer = xbt_new0(s_bw_request_t,1);
243   
244   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
245     TRY {
246       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
247     } CATCH(e) {
248       measMasterIn = NULL;
249       if (port < 10000)
250         xbt_ex_free(e);
251       else
252         /* FIXME: tell error to remote */
253         RETHROW0("Error encountered while opening a measurement server socket: %s");
254     }
255   }
256    
257   answer->buf_size=request->buf_size;
258   answer->exp_size=request->exp_size;
259   answer->msg_size=request->msg_size;
260   answer->host.port=gras_socket_my_port(measMasterIn);
261
262   TRY {
263     gras_msg_rpcreturn(60,ctx,&answer);
264   } CATCH(e) { 
265     gras_socket_close(measMasterIn);
266     /* FIXME: tell error to remote */
267     RETHROW0("Error encountered while sending the answer: %s");
268   }
269
270
271   /* Don't connect asap to leave time to other side to enter the accept() */
272   TRY {
273     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
274                                      request->host.port,
275                                      request->buf_size,1);
276   } CATCH(e) {
277     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
278              gras_socket_peer_name(expeditor),request->host.port);
279     /* FIXME: tell error to remote */
280   }
281
282   TRY {
283     measIn = gras_socket_meas_accept(measMasterIn);
284     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
285            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
286   } CATCH(e) {
287     gras_socket_close(measMasterIn);
288     gras_socket_close(measIn);
289     gras_socket_close(measOut);
290     /* FIXME: tell error to remote ? */
291     RETHROW0("Error encountered while opening the meas socket: %s");
292   }
293
294   if (!msgtwaited) {
295     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
296     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
297     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
298   }
299
300   while (tooshort) {
301     void *payload;
302     int msggot;
303     TRY {
304       DEBUG0("Recv / Send the experiment");
305       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
306       gras_socket_meas_send(measOut,120,1,1);
307       DEBUG0("ACK sent");
308     } CATCH(e) {
309       gras_socket_close(measMasterIn);
310       gras_socket_close(measIn);
311       gras_socket_close(measOut);
312       /* FIXME: tell error to remote ? */
313       RETHROW0("Error encountered while receiving the experiment: %s");
314     }
315     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
316     switch(msggot) {
317     case 0: /* BW stop */
318       tooshort = 0;
319       break;
320     case 1: /* BW reask */
321       tooshort = 1;
322       free(request);
323       request = (bw_request_t)payload;
324       VERB0("Return the reasking RPC");
325       gras_msg_rpcreturn(60,ctx_reask,NULL);
326     }
327     gras_msg_cb_ctx_free(ctx_reask);
328   }
329
330   if (measIn != measMasterIn)
331     gras_socket_close(measMasterIn);
332   gras_socket_close(measIn);
333   gras_socket_close(measOut);
334   free(answer);
335   free(request);
336   VERB0("BW experiment done.");
337   return 1;
338 }
339
340 /**
341  * \brief request a bandwidth measurement between two remote hosts
342  *
343  * \arg from_name: Name of the first host 
344  * \arg from_port: port on which the first process is listening for messages
345  * \arg to_name: Name of the second host 
346  * \arg to_port: port on which the second process is listening (for messages, do not 
347  * give a measurement socket here. The needed measurement sockets will be created 
348  * automatically and negociated between the peers)
349  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
350  * \arg exp_size: Total size of data sent across the network
351  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
352  * \arg sec: where the result (in seconds) should be stored.
353  * \arg bw: observed Bandwidth (in byte/s)
354  *
355  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
356  * This call is blocking until the end of the experiment.
357  *
358  * Results are reported in last args, and sizes are in bytes.
359  */
360 void amok_bw_request(const char* from_name,unsigned int from_port,
361                      const char* to_name,unsigned int to_port,
362                      unsigned long int buf_size,
363                      unsigned long int exp_size,
364                      unsigned long int msg_size,
365                      double min_duration,
366              /*OUT*/ double *sec, double*bw) {
367   
368   gras_socket_t sock;
369   /* The request */
370   bw_request_t request;
371   bw_res_t result;
372
373   request=xbt_new0(s_bw_request_t,1);
374   request->buf_size=buf_size;
375   request->exp_size=exp_size;
376   request->msg_size=msg_size;
377   request->min_duration = min_duration;
378
379   request->host.name = (char*)to_name;
380   request->host.port = to_port;
381
382   sock = gras_socket_client(from_name,from_port);
383   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
384
385   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
386   
387   if (sec)
388     *sec=result->sec;
389   if (bw)
390     *bw =result->bw;
391
392   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
393         from_name,from_port, to_name,to_port,
394         result->sec,((double)result->bw)/1024.0);
395
396   gras_socket_close(sock);
397   free(result);
398   free(request);
399 }
400
401 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
402                           void            *payload) {
403                           
404   /* specification of the test to run, and our answer */
405   bw_request_t request = *(bw_request_t*)payload;
406   bw_res_t result = xbt_new0(s_bw_res_t,1);
407   gras_socket_t peer,asker;
408
409   asker=gras_msg_cb_ctx_from(ctx);
410   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
411         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
412         request->host.name,request->host.port);
413   peer = gras_socket_client(request->host.name,request->host.port);
414   amok_bw_test(peer,
415                request->buf_size,request->exp_size,request->msg_size,
416                request->min_duration,
417                &(result->sec),&(result->bw));
418
419   gras_msg_rpcreturn(240,ctx,&result);
420
421   gras_os_sleep(1);
422   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
423   free(request->host.name);
424   free(request);
425   free(result);
426   
427   return 1;
428 }
429
430 /** \brief builds a matrix of results of bandwidth measurement */
431 double * amok_bw_matrix(xbt_dynar_t hosts,
432                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
433                         double min_duration) { 
434   double sec;
435   /* construction of matrices for bandwith and latency */
436
437
438   int i,j,len=xbt_dynar_length(hosts);
439
440   double *matrix_res = xbt_new0(double, len*len);
441   xbt_host_t h1,h2;
442
443   xbt_dynar_foreach (hosts,i,h1) {
444     xbt_dynar_foreach (hosts,j,h2) {
445       if (i!=j) {
446         /* Mesurements of Bandwidth */
447         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
448                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
449                         &sec,&matrix_res[i*len + j]);
450       } 
451     }
452   }
453   return matrix_res;
454 }