Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cap the increment of the experience size to avoid choosing a value stupidly too large...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   int first_pass; 
138   int nb_messages = (exp_size % msg_size == 0) ? 
139     (exp_size / msg_size) : (exp_size / msg_size + 1); 
140   
141   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
142     TRY {
143       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
144     } CATCH(e) {
145       measMasterIn = NULL;
146       if (port == 10000 -1) {
147         RETHROW0("Error caught while opening a measurement socket: %s");
148       } else {
149         xbt_ex_free(e); 
150       }
151     }
152   }
153   
154   request=xbt_new0(s_bw_request_t,1);
155   request->buf_size=buf_size;
156   request->exp_size=msg_size * nb_messages;
157   request->msg_size=msg_size;
158   request->peer.name = NULL;
159   request->peer.port = gras_socket_my_port(measMasterIn);
160   DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
161         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
162         buf_size,request->buf_size);
163
164   TRY {
165     gras_msg_rpccall(peer,15,
166                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while sending the BW request: %s");
169   }
170   measIn = gras_socket_meas_accept(measMasterIn);
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->peer.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->peer.port);
179   }
180   DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
181          request->exp_size, request->msg_size);
182
183   *sec = 0;
184   first_pass = 1;
185   do {
186     if (first_pass == 0) {
187       double meas_duration=*sec;
188       double increase;
189       if (*sec != 0.0 ) {
190          increase = (min_duration / meas_duration) * 1.1;
191       } else {
192          increase = 4; 
193       }
194       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
195       if (increase > 20)
196          increase = 20; 
197             
198       request->msg_size = request->msg_size * increase;
199
200       /* Do not do too large experiments messages or the sensors will start to swap to store one of them */
201       if (request->msg_size > 64*1024*1024)
202         request->msg_size = 64*1024*1024;
203
204       if (request->exp_size > request->msg_size * nb_messages) 
205          CRITICAL0("overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
206       else
207          request->exp_size = request->msg_size * nb_messages;
208
209       VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (got %fkb/s)",
210              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
211       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
212     }
213
214     first_pass = 0;
215     *sec=gras_os_time();
216     TRY {
217       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
218       DEBUG0("Data sent. Wait ACK");
219       gras_socket_meas_recv(measIn,120,1,1);
220     } CATCH(e) {
221       gras_socket_close(measOut);
222       gras_socket_close(measMasterIn);
223       gras_socket_close(measIn);
224       RETHROW0("Unable to conduct the experiment: %s");
225     }
226     *sec = gras_os_time() - *sec;
227     if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
228     DEBUG1("Experiment done ; it took %f sec", *sec);
229     if (*sec <= 0) {
230       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
231     }
232
233   } while (*sec < min_duration);
234
235   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
236          *sec,*bw);
237   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
238
239   free(request_ack);
240   free(request);
241   if (measIn != measMasterIn)
242     gras_socket_close(measIn);
243   gras_socket_close(measMasterIn);
244   gras_socket_close(measOut);
245 }
246
247
248 /* Callback to the "BW handshake" message: 
249    opens a server measurement socket,
250    indicate its port in an "BW handshaked" message,
251    receive the corresponding data on the measurement socket, 
252    close the measurment socket
253
254    sizes are in byte
255 */
256 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
257                             void          *payload) {
258   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
259   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
260   bw_request_t request=*(bw_request_t*)payload;
261   bw_request_t answer;
262   xbt_ex_t e;
263   int port;
264   int tooshort = 1;
265   gras_msg_cb_ctx_t ctx_reask;
266   static xbt_dynar_t msgtwaited=NULL;
267   
268   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
269         gras_socket_peer_name(expeditor),request->peer.port,
270         request->buf_size,request->exp_size,request->msg_size);     
271
272   /* Build our answer */
273   answer = xbt_new0(s_bw_request_t,1);
274   
275   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
276     TRY {
277       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
278     } CATCH(e) {
279       measMasterIn = NULL;
280       if (port < 10000)
281         xbt_ex_free(e);
282       else
283         /* FIXME: tell error to remote */
284         RETHROW0("Error encountered while opening a measurement server socket: %s");
285     }
286   }
287    
288   answer->buf_size=request->buf_size;
289   answer->exp_size=request->exp_size;
290   answer->msg_size=request->msg_size;
291   answer->peer.port=gras_socket_my_port(measMasterIn);
292
293   TRY {
294     gras_msg_rpcreturn(60,ctx,&answer);
295   } CATCH(e) { 
296     gras_socket_close(measMasterIn);
297     /* FIXME: tell error to remote */
298     RETHROW0("Error encountered while sending the answer: %s");
299   }
300
301
302   /* Don't connect asap to leave time to other side to enter the accept() */
303   TRY {
304     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
305                                      request->peer.port,
306                                      request->buf_size,1);
307   } CATCH(e) {
308     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
309              gras_socket_peer_name(expeditor),request->peer.port);
310     /* FIXME: tell error to remote */
311   }
312
313   TRY {
314     measIn = gras_socket_meas_accept(measMasterIn);
315     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
316            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
317   } CATCH(e) {
318     gras_socket_close(measMasterIn);
319     gras_socket_close(measIn);
320     gras_socket_close(measOut);
321     /* FIXME: tell error to remote ? */
322     RETHROW0("Error encountered while opening the meas socket: %s");
323   }
324
325   if (!msgtwaited) {
326     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
327     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
328     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
329   }
330
331   while (tooshort) {
332     void *payload;
333     int msggot;
334     TRY {
335       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
336       gras_socket_meas_send(measOut,120,1,1);
337     } CATCH(e) {
338       gras_socket_close(measMasterIn);
339       gras_socket_close(measIn);
340       gras_socket_close(measOut);
341       /* FIXME: tell error to remote ? */
342       RETHROW0("Error encountered while receiving the experiment: %s");
343     }
344     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
345     switch(msggot) {
346     case 0: /* BW stop */
347       tooshort = 0;
348       break;
349     case 1: /* BW reask */
350       tooshort = 1;
351       free(request);
352       request = (bw_request_t)payload;
353       VERB0("Return the reasking RPC");
354       gras_msg_rpcreturn(60,ctx_reask,NULL);
355     }
356     gras_msg_cb_ctx_free(ctx_reask);
357   }
358
359   if (measIn != measMasterIn)
360     gras_socket_close(measMasterIn);
361   gras_socket_close(measIn);
362   gras_socket_close(measOut);
363   free(answer);
364   free(request);
365   VERB0("BW experiment done.");
366   return 1;
367 }
368
369 /**
370  * \brief request a bandwidth measurement between two remote peers
371  *
372  * \arg from_name: Name of the first peer 
373  * \arg from_port: port on which the first process is listening for messages
374  * \arg to_name: Name of the second peer 
375  * \arg to_port: port on which the second process is listening (for messages, do not 
376  * give a measurement socket here. The needed measurement sockets will be created 
377  * automatically and negociated between the peers)
378  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
379  * \arg exp_size: Total size of data sent across the network
380  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
381  * \arg sec: where the result (in seconds) should be stored.
382  * \arg bw: observed Bandwidth (in byte/s)
383  *
384  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
385  * This call is blocking until the end of the experiment.
386  *
387  * Results are reported in last args, and sizes are in bytes.
388  */
389 void amok_bw_request(const char* from_name,unsigned int from_port,
390                      const char* to_name,unsigned int to_port,
391                      unsigned long int buf_size,
392                      unsigned long int exp_size,
393                      unsigned long int msg_size,
394                      double min_duration,
395              /*OUT*/ double *sec, double*bw) {
396   
397   gras_socket_t sock;
398   /* The request */
399   bw_request_t request;
400   bw_res_t result;
401   request=xbt_new0(s_bw_request_t,1);
402   request->buf_size=buf_size;
403   request->exp_size=exp_size;
404   request->msg_size=msg_size;
405   request->min_duration = min_duration;
406
407
408   request->peer.name = (char*)to_name;
409   request->peer.port = to_port;
410
411
412   sock = gras_socket_client(from_name,from_port);
413  
414     
415  
416   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
417   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
418
419   if (sec)
420     *sec=result->sec;
421   if (bw)
422     *bw =result->bw;
423
424   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
425         from_name,from_port, to_name,to_port,
426         result->sec,((double)result->bw)/1024.0);
427
428   gras_socket_close(sock);
429   free(result);
430   free(request);
431 }
432
433 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
434                           void            *payload) {
435                           
436   /* specification of the test to run, and our answer */
437   bw_request_t request = *(bw_request_t*)payload;
438   bw_res_t result = xbt_new0(s_bw_res_t,1);
439   gras_socket_t peer,asker;
440
441   asker=gras_msg_cb_ctx_from(ctx);
442   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
443         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
444
445         request->peer.name,request->peer.port);
446   peer = gras_socket_client(request->peer.name,request->peer.port);
447   amok_bw_test(peer,
448                request->buf_size,request->exp_size,request->msg_size,
449                request->min_duration,
450                &(result->sec),&(result->bw));
451  
452   gras_msg_rpcreturn(240,ctx,&result);
453
454   gras_os_sleep(1);
455   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
456   free(request->peer.name);
457   free(request);
458   free(result);
459   
460   return 1;
461 }
462
463 /** \brief builds a matrix of results of bandwidth measurement */
464 double * amok_bw_matrix(xbt_dynar_t peers,
465                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
466                         double min_duration) { 
467   double sec;
468   /* construction of matrices for bandwith and latency */
469
470
471   int i,j,len=xbt_dynar_length(peers);
472
473   double *matrix_res = xbt_new0(double, len*len);
474   xbt_peer_t p1,p2;
475
476   xbt_dynar_foreach (peers,i,p1) {
477     xbt_dynar_foreach (peers,j,p2) {
478       if (i!=j) {
479         /* Mesurements of Bandwidth */
480         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
481                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
482                         &sec,&matrix_res[i*len + j]);
483       } 
484     }
485   }
486   return matrix_res;
487 }