Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make AMOK bandwidth tests more robust.
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   int first_pass; 
138   
139   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
140     TRY {
141       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
142     } CATCH(e) {
143       measMasterIn = NULL;
144       if (port == 10000 -1) {
145         RETHROW0("Error caught while opening a measurement socket: %s");
146       } else {
147         xbt_ex_free(e); 
148       }
149     }
150   }
151   
152   request=xbt_new0(s_bw_request_t,1);
153   request->buf_size=buf_size;
154   request->exp_size=exp_size;
155   request->msg_size=msg_size;
156   request->peer.name = NULL;
157   request->peer.port = gras_socket_my_port(measMasterIn);
158   DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
159         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
160         buf_size,request->buf_size);
161
162   TRY {
163     gras_msg_rpccall(peer,15,
164                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
165   } CATCH(e) {
166     RETHROW0("Error encountered while sending the BW request: %s");
167   }
168   measIn = gras_socket_meas_accept(measMasterIn);
169    
170   TRY {
171     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
172                                    request_ack->peer.port, 
173                                    request->buf_size,1);
174   } CATCH(e) {
175     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
176              gras_socket_peer_name(peer),request_ack->peer.port);
177   }
178   DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
179          request->exp_size, request->msg_size);
180
181   *sec = 0;
182   first_pass = 1;
183   do {
184     if (first_pass == 0) {
185       double meas_duration=*sec;
186       if (*sec != 0.0 ) { 
187         request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
188         request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
189       } else {
190         request->exp_size = request->exp_size * 4; 
191         request->msg_size = request->msg_size * 4; 
192       }
193              
194       if (request->msg_size > 64*1024*1024)
195         request->msg_size = 64*1024*1024;
196
197       VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
198              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
199       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
200     }
201
202     first_pass = 0;
203     *sec=gras_os_time();
204     TRY {
205       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
206       DEBUG0("Data sent. Wait ACK");
207       gras_socket_meas_recv(measIn,120,1,1);
208     } CATCH(e) {
209       gras_socket_close(measOut);
210       gras_socket_close(measMasterIn);
211       gras_socket_close(measIn);
212       RETHROW0("Unable to conduct the experiment: %s");
213     }
214     *sec = gras_os_time() - *sec;
215     if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
216     DEBUG1("Experiment done ; it took %f sec", *sec);
217     if (*sec <= 0) {
218       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
219     }
220
221   } while (*sec < min_duration);
222
223   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
224          *sec,*bw);
225   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
226
227   free(request_ack);
228   free(request);
229   if (measIn != measMasterIn)
230     gras_socket_close(measIn);
231   gras_socket_close(measMasterIn);
232   gras_socket_close(measOut);
233 }
234
235
236 /* Callback to the "BW handshake" message: 
237    opens a server measurement socket,
238    indicate its port in an "BW handshaked" message,
239    receive the corresponding data on the measurement socket, 
240    close the measurment socket
241
242    sizes are in byte
243 */
244 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
245                             void          *payload) {
246   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
247   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
248   bw_request_t request=*(bw_request_t*)payload;
249   bw_request_t answer;
250   xbt_ex_t e;
251   int port;
252   int tooshort = 1;
253   gras_msg_cb_ctx_t ctx_reask;
254   static xbt_dynar_t msgtwaited=NULL;
255   
256   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
257         gras_socket_peer_name(expeditor),request->peer.port,
258         request->buf_size,request->exp_size,request->msg_size);     
259
260   /* Build our answer */
261   answer = xbt_new0(s_bw_request_t,1);
262   
263   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
264     TRY {
265       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
266     } CATCH(e) {
267       measMasterIn = NULL;
268       if (port < 10000)
269         xbt_ex_free(e);
270       else
271         /* FIXME: tell error to remote */
272         RETHROW0("Error encountered while opening a measurement server socket: %s");
273     }
274   }
275    
276   answer->buf_size=request->buf_size;
277   answer->exp_size=request->exp_size;
278   answer->msg_size=request->msg_size;
279   answer->peer.port=gras_socket_my_port(measMasterIn);
280
281   TRY {
282     gras_msg_rpcreturn(60,ctx,&answer);
283   } CATCH(e) { 
284     gras_socket_close(measMasterIn);
285     /* FIXME: tell error to remote */
286     RETHROW0("Error encountered while sending the answer: %s");
287   }
288
289
290   /* Don't connect asap to leave time to other side to enter the accept() */
291   TRY {
292     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
293                                      request->peer.port,
294                                      request->buf_size,1);
295   } CATCH(e) {
296     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
297              gras_socket_peer_name(expeditor),request->peer.port);
298     /* FIXME: tell error to remote */
299   }
300
301   TRY {
302     measIn = gras_socket_meas_accept(measMasterIn);
303     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
304            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
305   } CATCH(e) {
306     gras_socket_close(measMasterIn);
307     gras_socket_close(measIn);
308     gras_socket_close(measOut);
309     /* FIXME: tell error to remote ? */
310     RETHROW0("Error encountered while opening the meas socket: %s");
311   }
312
313   if (!msgtwaited) {
314     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
315     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
316     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
317   }
318
319   while (tooshort) {
320     void *payload;
321     int msggot;
322     TRY {
323       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
324       gras_socket_meas_send(measOut,120,1,1);
325     } CATCH(e) {
326       gras_socket_close(measMasterIn);
327       gras_socket_close(measIn);
328       gras_socket_close(measOut);
329       /* FIXME: tell error to remote ? */
330       RETHROW0("Error encountered while receiving the experiment: %s");
331     }
332     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
333     switch(msggot) {
334     case 0: /* BW stop */
335       tooshort = 0;
336       break;
337     case 1: /* BW reask */
338       tooshort = 1;
339       free(request);
340       request = (bw_request_t)payload;
341       VERB0("Return the reasking RPC");
342       gras_msg_rpcreturn(60,ctx_reask,NULL);
343     }
344     gras_msg_cb_ctx_free(ctx_reask);
345   }
346
347   if (measIn != measMasterIn)
348     gras_socket_close(measMasterIn);
349   gras_socket_close(measIn);
350   gras_socket_close(measOut);
351   free(answer);
352   free(request);
353   VERB0("BW experiment done.");
354   return 1;
355 }
356
357 /**
358  * \brief request a bandwidth measurement between two remote peers
359  *
360  * \arg from_name: Name of the first peer 
361  * \arg from_port: port on which the first process is listening for messages
362  * \arg to_name: Name of the second peer 
363  * \arg to_port: port on which the second process is listening (for messages, do not 
364  * give a measurement socket here. The needed measurement sockets will be created 
365  * automatically and negociated between the peers)
366  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
367  * \arg exp_size: Total size of data sent across the network
368  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
369  * \arg sec: where the result (in seconds) should be stored.
370  * \arg bw: observed Bandwidth (in byte/s)
371  *
372  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
373  * This call is blocking until the end of the experiment.
374  *
375  * Results are reported in last args, and sizes are in bytes.
376  */
377 void amok_bw_request(const char* from_name,unsigned int from_port,
378                      const char* to_name,unsigned int to_port,
379                      unsigned long int buf_size,
380                      unsigned long int exp_size,
381                      unsigned long int msg_size,
382                      double min_duration,
383              /*OUT*/ double *sec, double*bw) {
384   
385   gras_socket_t sock;
386   /* The request */
387   bw_request_t request;
388   bw_res_t result;
389   request=xbt_new0(s_bw_request_t,1);
390   request->buf_size=buf_size;
391   request->exp_size=exp_size;
392   request->msg_size=msg_size;
393   request->min_duration = min_duration;
394
395
396   request->peer.name = (char*)to_name;
397   request->peer.port = to_port;
398
399
400   sock = gras_socket_client(from_name,from_port);
401  
402     
403  
404   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
405   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
406
407   if (sec)
408     *sec=result->sec;
409   if (bw)
410     *bw =result->bw;
411
412   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
413         from_name,from_port, to_name,to_port,
414         result->sec,((double)result->bw)/1024.0);
415
416   gras_socket_close(sock);
417   free(result);
418   free(request);
419 }
420
421 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
422                           void            *payload) {
423                           
424   /* specification of the test to run, and our answer */
425   bw_request_t request = *(bw_request_t*)payload;
426   bw_res_t result = xbt_new0(s_bw_res_t,1);
427   gras_socket_t peer,asker;
428
429   asker=gras_msg_cb_ctx_from(ctx);
430   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
431         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
432
433         request->peer.name,request->peer.port);
434   peer = gras_socket_client(request->peer.name,request->peer.port);
435   amok_bw_test(peer,
436                request->buf_size,request->exp_size,request->msg_size,
437                request->min_duration,
438                &(result->sec),&(result->bw));
439  
440   gras_msg_rpcreturn(240,ctx,&result);
441
442   gras_os_sleep(1);
443   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
444   free(request->peer.name);
445   free(request);
446   free(result);
447   
448   return 1;
449 }
450
451 /** \brief builds a matrix of results of bandwidth measurement */
452 double * amok_bw_matrix(xbt_dynar_t peers,
453                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
454                         double min_duration) { 
455   double sec;
456   /* construction of matrices for bandwith and latency */
457
458
459   int i,j,len=xbt_dynar_length(peers);
460
461   double *matrix_res = xbt_new0(double, len*len);
462   xbt_peer_t p1,p2;
463
464   xbt_dynar_foreach (peers,i,p1) {
465     xbt_dynar_foreach (peers,j,p2) {
466       if (i!=j) {
467         /* Mesurements of Bandwidth */
468         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
469                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
470                         &sec,&matrix_res[i*len + j]);
471       } 
472     }
473   }
474   return matrix_res;
475 }