Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leak plugs
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   int first_pass; 
138   int nb_messages = (exp_size % msg_size == 0) ? 
139     (exp_size / msg_size) : (exp_size / msg_size + 1); 
140   
141   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
142     TRY {
143       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
144     } CATCH(e) {
145       measMasterIn = NULL;
146       if (port == 10000 -1) {
147         RETHROW0("Error caught while opening a measurement socket: %s");
148       } else {
149         xbt_ex_free(e); 
150       }
151     }
152   }
153   
154   request=xbt_new0(s_bw_request_t,1);
155   request->buf_size=buf_size;
156   request->exp_size=msg_size * nb_messages;
157   request->msg_size=msg_size;
158   request->peer.name = NULL;
159   request->peer.port = gras_socket_my_port(measMasterIn);
160   DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
161         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
162         buf_size,request->buf_size);
163
164   TRY {
165     gras_msg_rpccall(peer,15,
166                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while sending the BW request: %s");
169   }
170   measIn = gras_socket_meas_accept(measMasterIn);
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->peer.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->peer.port);
179   }
180   DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
181          request->exp_size, request->msg_size);
182
183   *sec = 0;
184   first_pass = 1;
185   do {
186     if (first_pass == 0) {
187       double meas_duration=*sec;
188       if (*sec != 0.0 ) { 
189         request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
190       } else {
191         request->msg_size = request->msg_size * 4; 
192       }
193              
194       if (request->msg_size > 64*1024*1024)
195         request->msg_size = 64*1024*1024;
196
197       request->exp_size = request->msg_size * nb_messages; 
198
199       VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
200              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
201       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
202     }
203
204     first_pass = 0;
205     *sec=gras_os_time();
206     TRY {
207       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
208       DEBUG0("Data sent. Wait ACK");
209       gras_socket_meas_recv(measIn,120,1,1);
210     } CATCH(e) {
211       gras_socket_close(measOut);
212       gras_socket_close(measMasterIn);
213       gras_socket_close(measIn);
214       RETHROW0("Unable to conduct the experiment: %s");
215     }
216     *sec = gras_os_time() - *sec;
217     if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
218     DEBUG1("Experiment done ; it took %f sec", *sec);
219     if (*sec <= 0) {
220       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
221     }
222
223   } while (*sec < min_duration);
224
225   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
226          *sec,*bw);
227   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
228
229   free(request_ack);
230   free(request);
231   if (measIn != measMasterIn)
232     gras_socket_close(measIn);
233   gras_socket_close(measMasterIn);
234   gras_socket_close(measOut);
235 }
236
237
238 /* Callback to the "BW handshake" message: 
239    opens a server measurement socket,
240    indicate its port in an "BW handshaked" message,
241    receive the corresponding data on the measurement socket, 
242    close the measurment socket
243
244    sizes are in byte
245 */
246 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
247                             void          *payload) {
248   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
249   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
250   bw_request_t request=*(bw_request_t*)payload;
251   bw_request_t answer;
252   xbt_ex_t e;
253   int port;
254   int tooshort = 1;
255   gras_msg_cb_ctx_t ctx_reask;
256   static xbt_dynar_t msgtwaited=NULL;
257   
258   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
259         gras_socket_peer_name(expeditor),request->peer.port,
260         request->buf_size,request->exp_size,request->msg_size);     
261
262   /* Build our answer */
263   answer = xbt_new0(s_bw_request_t,1);
264   
265   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
266     TRY {
267       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
268     } CATCH(e) {
269       measMasterIn = NULL;
270       if (port < 10000)
271         xbt_ex_free(e);
272       else
273         /* FIXME: tell error to remote */
274         RETHROW0("Error encountered while opening a measurement server socket: %s");
275     }
276   }
277    
278   answer->buf_size=request->buf_size;
279   answer->exp_size=request->exp_size;
280   answer->msg_size=request->msg_size;
281   answer->peer.port=gras_socket_my_port(measMasterIn);
282
283   TRY {
284     gras_msg_rpcreturn(60,ctx,&answer);
285   } CATCH(e) { 
286     gras_socket_close(measMasterIn);
287     /* FIXME: tell error to remote */
288     RETHROW0("Error encountered while sending the answer: %s");
289   }
290
291
292   /* Don't connect asap to leave time to other side to enter the accept() */
293   TRY {
294     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
295                                      request->peer.port,
296                                      request->buf_size,1);
297   } CATCH(e) {
298     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
299              gras_socket_peer_name(expeditor),request->peer.port);
300     /* FIXME: tell error to remote */
301   }
302
303   TRY {
304     measIn = gras_socket_meas_accept(measMasterIn);
305     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
306            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
307   } CATCH(e) {
308     gras_socket_close(measMasterIn);
309     gras_socket_close(measIn);
310     gras_socket_close(measOut);
311     /* FIXME: tell error to remote ? */
312     RETHROW0("Error encountered while opening the meas socket: %s");
313   }
314
315   if (!msgtwaited) {
316     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
317     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
318     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
319   }
320
321   while (tooshort) {
322     void *payload;
323     int msggot;
324     TRY {
325       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
326       gras_socket_meas_send(measOut,120,1,1);
327     } CATCH(e) {
328       gras_socket_close(measMasterIn);
329       gras_socket_close(measIn);
330       gras_socket_close(measOut);
331       /* FIXME: tell error to remote ? */
332       RETHROW0("Error encountered while receiving the experiment: %s");
333     }
334     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
335     switch(msggot) {
336     case 0: /* BW stop */
337       tooshort = 0;
338       break;
339     case 1: /* BW reask */
340       tooshort = 1;
341       free(request);
342       request = (bw_request_t)payload;
343       VERB0("Return the reasking RPC");
344       gras_msg_rpcreturn(60,ctx_reask,NULL);
345     }
346     gras_msg_cb_ctx_free(ctx_reask);
347   }
348
349   if (measIn != measMasterIn)
350     gras_socket_close(measMasterIn);
351   gras_socket_close(measIn);
352   gras_socket_close(measOut);
353   free(answer);
354   free(request);
355   VERB0("BW experiment done.");
356   return 1;
357 }
358
359 /**
360  * \brief request a bandwidth measurement between two remote peers
361  *
362  * \arg from_name: Name of the first peer 
363  * \arg from_port: port on which the first process is listening for messages
364  * \arg to_name: Name of the second peer 
365  * \arg to_port: port on which the second process is listening (for messages, do not 
366  * give a measurement socket here. The needed measurement sockets will be created 
367  * automatically and negociated between the peers)
368  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
369  * \arg exp_size: Total size of data sent across the network
370  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
371  * \arg sec: where the result (in seconds) should be stored.
372  * \arg bw: observed Bandwidth (in byte/s)
373  *
374  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
375  * This call is blocking until the end of the experiment.
376  *
377  * Results are reported in last args, and sizes are in bytes.
378  */
379 void amok_bw_request(const char* from_name,unsigned int from_port,
380                      const char* to_name,unsigned int to_port,
381                      unsigned long int buf_size,
382                      unsigned long int exp_size,
383                      unsigned long int msg_size,
384                      double min_duration,
385              /*OUT*/ double *sec, double*bw) {
386   
387   gras_socket_t sock;
388   /* The request */
389   bw_request_t request;
390   bw_res_t result;
391   request=xbt_new0(s_bw_request_t,1);
392   request->buf_size=buf_size;
393   request->exp_size=exp_size;
394   request->msg_size=msg_size;
395   request->min_duration = min_duration;
396
397
398   request->peer.name = (char*)to_name;
399   request->peer.port = to_port;
400
401
402   sock = gras_socket_client(from_name,from_port);
403  
404     
405  
406   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
407   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
408
409   if (sec)
410     *sec=result->sec;
411   if (bw)
412     *bw =result->bw;
413
414   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
415         from_name,from_port, to_name,to_port,
416         result->sec,((double)result->bw)/1024.0);
417
418   gras_socket_close(sock);
419   free(result);
420   free(request);
421 }
422
423 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
424                           void            *payload) {
425                           
426   /* specification of the test to run, and our answer */
427   bw_request_t request = *(bw_request_t*)payload;
428   bw_res_t result = xbt_new0(s_bw_res_t,1);
429   gras_socket_t peer,asker;
430
431   asker=gras_msg_cb_ctx_from(ctx);
432   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
433         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
434
435         request->peer.name,request->peer.port);
436   peer = gras_socket_client(request->peer.name,request->peer.port);
437   amok_bw_test(peer,
438                request->buf_size,request->exp_size,request->msg_size,
439                request->min_duration,
440                &(result->sec),&(result->bw));
441  
442   gras_msg_rpcreturn(240,ctx,&result);
443
444   gras_os_sleep(1);
445   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
446   free(request->peer.name);
447   free(request);
448   free(result);
449   
450   return 1;
451 }
452
453 /** \brief builds a matrix of results of bandwidth measurement */
454 double * amok_bw_matrix(xbt_dynar_t peers,
455                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
456                         double min_duration) { 
457   double sec;
458   /* construction of matrices for bandwith and latency */
459
460
461   int i,j,len=xbt_dynar_length(peers);
462
463   double *matrix_res = xbt_new0(double, len*len);
464   xbt_peer_t p1,p2;
465
466   xbt_dynar_foreach (peers,i,p1) {
467     xbt_dynar_foreach (peers,j,p2) {
468       if (i!=j) {
469         /* Mesurements of Bandwidth */
470         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
471                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
472                         &sec,&matrix_res[i*len + j]);
473       } 
474     }
475   }
476   return matrix_res;
477 }