Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
allow to autocompute msgsize to achieve Real Saturations
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"host",
64                               gras_datadesc_by_name("s_xbt_host_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * Results are reported in last args, and sizes are in byte.
118  */
119 void amok_bw_test(gras_socket_t peer,
120                   unsigned long int buf_size,
121                   unsigned long int exp_size,
122                   unsigned long int msg_size,
123                   double min_duration,
124           /*OUT*/ double *sec, double *bw) {
125
126   /* Measurement sockets for the experiments */
127   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
128   int port;
129   bw_request_t request,request_ack;
130   xbt_ex_t e;
131   
132   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
133     TRY {
134       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
135     } CATCH(e) {
136       measMasterIn = NULL;
137       if (port == 10000 -1) {
138         RETHROW0("Error caught while opening a measurement socket: %s");
139       } else {
140         xbt_ex_free(e); 
141       }
142     }
143   }
144   
145   request=xbt_new0(s_bw_request_t,1);
146   request->buf_size=buf_size;
147   request->exp_size=exp_size;
148   request->msg_size=msg_size;
149   request->host.name = NULL;
150   request->host.port = gras_socket_my_port(measMasterIn);
151   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
152         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
153         buf_size,request->buf_size);
154
155   TRY {
156     gras_msg_rpccall(peer,15,
157                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
158   } CATCH(e) {
159     RETHROW0("Error encountered while sending the BW request: %s");
160   }
161   measIn = gras_socket_meas_accept(measMasterIn);
162    
163   TRY {
164     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
165                                    request_ack->host.port, 
166                                    request->buf_size,1);
167   } CATCH(e) {
168     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
169              gras_socket_peer_name(peer),request_ack->host.port);
170   }
171   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
172
173   *sec = 0;
174   do {
175     if (*sec>0) {
176       double meas_duration=*sec;
177       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
178
179       DEBUG4("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld (got %fkb/s)",
180              meas_duration,min_duration,request->exp_size,((double)exp_size) / *sec/1024);
181       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
182       DEBUG0("Peer is ready for another round of fun");
183     }
184
185     *sec=gras_os_time();
186     TRY {
187       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
188       DEBUG0("Data sent. Wait ACK");
189       gras_socket_meas_recv(measIn,120,1,1);
190     } CATCH(e) {
191       gras_socket_close(measOut);
192       gras_socket_close(measMasterIn);
193       gras_socket_close(measIn);
194       RETHROW0("Unable to conduct the experiment: %s");
195     }
196     DEBUG0("Experiment done");
197
198     *sec = gras_os_time() - *sec;
199     *bw = ((double)exp_size) / *sec;
200   } while (*sec < min_duration);
201
202   DEBUG0("This measurement was long enough. Stop peer");
203   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
204
205   free(request_ack);
206   free(request);
207   if (measIn != measMasterIn)
208     gras_socket_close(measIn);
209   gras_socket_close(measMasterIn);
210   gras_socket_close(measOut);
211 }
212
213
214 /* Callback to the "BW handshake" message: 
215    opens a server measurement socket,
216    indicate its port in an "BW handshaked" message,
217    receive the corresponding data on the measurement socket, 
218    close the measurment socket
219
220    sizes are in byte
221 */
222 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
223                             void          *payload) {
224   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
225   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
226   bw_request_t request=*(bw_request_t*)payload;
227   bw_request_t answer;
228   xbt_ex_t e;
229   int port;
230   int tooshort = 1;
231   gras_msg_cb_ctx_t ctx_reask;
232   static xbt_dynar_t msgtwaited=NULL;
233   
234   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
235         gras_socket_peer_name(expeditor),request->host.port,
236         request->buf_size,request->exp_size,request->msg_size);     
237
238   /* Build our answer */
239   answer = xbt_new0(s_bw_request_t,1);
240   
241   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
242     TRY {
243       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
244     } CATCH(e) {
245       measMasterIn = NULL;
246       if (port < 10000)
247         xbt_ex_free(e);
248       else
249         /* FIXME: tell error to remote */
250         RETHROW0("Error encountered while opening a measurement server socket: %s");
251     }
252   }
253    
254   answer->buf_size=request->buf_size;
255   answer->exp_size=request->exp_size;
256   answer->msg_size=request->msg_size;
257   answer->host.port=gras_socket_my_port(measMasterIn);
258
259   TRY {
260     gras_msg_rpcreturn(60,ctx,&answer);
261   } CATCH(e) { 
262     gras_socket_close(measMasterIn);
263     /* FIXME: tell error to remote */
264     RETHROW0("Error encountered while sending the answer: %s");
265   }
266
267
268   /* Don't connect asap to leave time to other side to enter the accept() */
269   TRY {
270     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
271                                      request->host.port,
272                                      request->buf_size,1);
273   } CATCH(e) {
274     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
275              gras_socket_peer_name(expeditor),request->host.port);
276     /* FIXME: tell error to remote */
277   }
278
279   TRY {
280     measIn = gras_socket_meas_accept(measMasterIn);
281     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
282            answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
283   } CATCH(e) {
284     gras_socket_close(measMasterIn);
285     gras_socket_close(measIn);
286     gras_socket_close(measOut);
287     /* FIXME: tell error to remote ? */
288     RETHROW0("Error encountered while opening the meas socket: %s");
289   }
290
291   if (!msgtwaited) {
292     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
293     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
294     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
295   }
296
297   while (tooshort) {
298     void *payload;
299     int msggot;
300     TRY {
301       DEBUG0("Recv / Send the experiment");
302       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
303       gras_socket_meas_send(measOut,120,1,1);
304       DEBUG0("ACK sent");
305     } CATCH(e) {
306       gras_socket_close(measMasterIn);
307       gras_socket_close(measIn);
308       gras_socket_close(measOut);
309       /* FIXME: tell error to remote ? */
310       RETHROW0("Error encountered while receiving the experiment: %s");
311     }
312     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
313     switch(msggot) {
314     case 0: /* BW stop */
315       tooshort = 0;
316       break;
317     case 1: /* BW reask */
318       tooshort = 1;
319       free(request);
320       request = (bw_request_t)payload;
321       VERB0("Return the reasking RPC");
322       gras_msg_rpcreturn(60,ctx_reask,NULL);
323     }
324     gras_msg_cb_ctx_free(ctx_reask);
325   }
326
327   if (measIn != measMasterIn)
328     gras_socket_close(measMasterIn);
329   gras_socket_close(measIn);
330   gras_socket_close(measOut);
331   free(answer);
332   free(request);
333   VERB0("BW experiment done.");
334   return 1;
335 }
336
337 /**
338  * \brief request a bandwidth measurement between two remote hosts
339  *
340  * \arg from_name: Name of the first host 
341  * \arg from_port: port on which the first process is listening for messages
342  * \arg to_name: Name of the second host 
343  * \arg to_port: port on which the second process is listening (for messages, do not 
344  * give a measurement socket here. The needed measurement sockets will be created 
345  * automatically and negociated between the peers)
346  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
347  * \arg exp_size: Total size of data sent across the network
348  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
349  * \arg sec: where the result (in seconds) should be stored.
350  * \arg bw: observed Bandwidth (in byte/s)
351  *
352  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
353  * This call is blocking until the end of the experiment.
354  *
355  * Results are reported in last args, and sizes are in bytes.
356  */
357 void amok_bw_request(const char* from_name,unsigned int from_port,
358                      const char* to_name,unsigned int to_port,
359                      unsigned long int buf_size,
360                      unsigned long int exp_size,
361                      unsigned long int msg_size,
362                      double min_duration,
363              /*OUT*/ double *sec, double*bw) {
364   
365   gras_socket_t sock;
366   /* The request */
367   bw_request_t request;
368   bw_res_t result;
369
370   request=xbt_new0(s_bw_request_t,1);
371   request->buf_size=buf_size;
372   request->exp_size=exp_size;
373   request->msg_size=msg_size;
374   request->min_duration = min_duration;
375
376   request->host.name = (char*)to_name;
377   request->host.port = to_port;
378
379   sock = gras_socket_client(from_name,from_port);
380   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
381
382   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
383   
384   if (sec)
385     *sec=result->sec;
386   if (bw)
387     *bw =result->bw;
388
389   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
390         from_name,from_port, to_name,to_port,
391         result->sec,((double)result->bw)/1024.0);
392
393   gras_socket_close(sock);
394   free(result);
395   free(request);
396 }
397
398 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
399                           void            *payload) {
400                           
401   /* specification of the test to run, and our answer */
402   bw_request_t request = *(bw_request_t*)payload;
403   bw_res_t result = xbt_new0(s_bw_res_t,1);
404   gras_socket_t peer,asker;
405
406   asker=gras_msg_cb_ctx_from(ctx);
407   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
408         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
409         request->host.name,request->host.port);
410   peer = gras_socket_client(request->host.name,request->host.port);
411   amok_bw_test(peer,
412                request->buf_size,request->exp_size,request->msg_size,
413                request->min_duration,
414                &(result->sec),&(result->bw));
415
416   gras_msg_rpcreturn(240,ctx,&result);
417
418   gras_os_sleep(1);
419   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
420   free(request->host.name);
421   free(request);
422   free(result);
423   
424   return 1;
425 }
426
427 /** \brief builds a matrix of results of bandwidth measurement */
428 double * amok_bw_matrix(xbt_dynar_t hosts,
429                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
430                         double min_duration) { 
431   double sec;
432   /* construction of matrices for bandwith and latency */
433
434
435   int i,j,len=xbt_dynar_length(hosts);
436
437   double *matrix_res = xbt_new0(double, len*len);
438   xbt_host_t h1,h2;
439
440   xbt_dynar_foreach (hosts,i,h1) {
441     xbt_dynar_foreach (hosts,j,h2) {
442       if (i!=j) {
443         /* Mesurements of Bandwidth */
444         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
445                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
446                         &sec,&matrix_res[i*len + j]);
447       } 
448     }
449   }
450   return matrix_res;
451 }