Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
912cf65427dc0424265d8a95728eb36bb6d10bd2
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   int first_pass; 
138   int nb_messages = (exp_size % msg_size == 0) ? 
139     (exp_size / msg_size) : (exp_size / msg_size + 1); 
140   
141   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
142     TRY {
143       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
144     } CATCH(e) {
145       measMasterIn = NULL;
146       if (port == 10000 -1) {
147         RETHROW0("Error caught while opening a measurement socket: %s");
148       } else {
149         xbt_ex_free(e); 
150       }
151     }
152   }
153   
154   request=xbt_new0(s_bw_request_t,1);
155   request->buf_size=buf_size;
156   request->exp_size=msg_size * nb_messages;
157   request->msg_size=msg_size;
158   request->peer.name = NULL;
159   request->peer.port = gras_socket_my_port(measMasterIn);
160   DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
161         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
162         buf_size,request->buf_size);
163
164   TRY {
165     gras_msg_rpccall(peer,15,
166                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while sending the BW request: %s");
169   }
170   measIn = gras_socket_meas_accept(measMasterIn);
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->peer.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->peer.port);
179   }
180   DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
181          request->exp_size, request->msg_size);
182
183   *sec = 0;
184   first_pass = 1;
185   do {
186     if (first_pass == 0) {
187       double meas_duration=*sec;
188       double increase;
189       if (*sec != 0.0 ) {
190          increase = (min_duration / meas_duration) * 1.1;
191       } else {
192          increase = 4; 
193       }
194       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
195       if (increase > 20)
196          increase = 20; 
197             
198       request->msg_size = request->msg_size * increase;
199
200       /* Do not do too large experiments messages or the sensors 
201          will start to swap to store one of them.
202          And then increase the number of messages to compensate */
203       if (request->msg_size > 64*1024*1024) {
204         nb_messages = ( (request->msg_size / ((double)64*1024*1024)) 
205                         * nb_messages ) + 1; 
206         request->msg_size = 64*1024*1024;
207       }
208
209       VERB6("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (nb_messages=%d) (got %fkb/s)",
210             meas_duration, min_duration, 
211             request->exp_size, request->msg_size, nb_messages, 
212             ((double)request->exp_size) / *sec/1024);
213
214       if (request->exp_size > request->msg_size * nb_messages) 
215          CRITICAL0("overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
216       else
217          request->exp_size = request->msg_size * nb_messages;
218
219
220       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
221     }
222
223     first_pass = 0;
224     *sec=gras_os_time();
225     TRY {
226       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
227       DEBUG0("Data sent. Wait ACK");
228       gras_socket_meas_recv(measIn,120,1,1);
229     } CATCH(e) {
230       gras_socket_close(measOut);
231       gras_socket_close(measMasterIn);
232       gras_socket_close(measIn);
233       RETHROW0("Unable to conduct the experiment: %s");
234     }
235     *sec = gras_os_time() - *sec;
236     if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
237     DEBUG1("Experiment done ; it took %f sec", *sec);
238     if (*sec <= 0) {
239       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
240     }
241
242   } while (*sec < min_duration);
243
244   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
245          *sec,*bw);
246   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
247
248   free(request_ack);
249   free(request);
250   if (measIn != measMasterIn)
251     gras_socket_close(measIn);
252   gras_socket_close(measMasterIn);
253   gras_socket_close(measOut);
254 }
255
256
257 /* Callback to the "BW handshake" message: 
258    opens a server measurement socket,
259    indicate its port in an "BW handshaked" message,
260    receive the corresponding data on the measurement socket, 
261    close the measurment socket
262
263    sizes are in byte
264 */
265 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
266                             void          *payload) {
267   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
268   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
269   bw_request_t request=*(bw_request_t*)payload;
270   bw_request_t answer;
271   xbt_ex_t e;
272   int port;
273   int tooshort = 1;
274   gras_msg_cb_ctx_t ctx_reask;
275   static xbt_dynar_t msgtwaited=NULL;
276   
277   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
278         gras_socket_peer_name(expeditor),request->peer.port,
279         request->buf_size,request->exp_size,request->msg_size);     
280
281   /* Build our answer */
282   answer = xbt_new0(s_bw_request_t,1);
283   
284   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
285     TRY {
286       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
287     } CATCH(e) {
288       measMasterIn = NULL;
289       if (port < 10000)
290         xbt_ex_free(e);
291       else
292         /* FIXME: tell error to remote */
293         RETHROW0("Error encountered while opening a measurement server socket: %s");
294     }
295   }
296    
297   answer->buf_size=request->buf_size;
298   answer->exp_size=request->exp_size;
299   answer->msg_size=request->msg_size;
300   answer->peer.port=gras_socket_my_port(measMasterIn);
301
302   TRY {
303     gras_msg_rpcreturn(60,ctx,&answer);
304   } CATCH(e) { 
305     gras_socket_close(measMasterIn);
306     /* FIXME: tell error to remote */
307     RETHROW0("Error encountered while sending the answer: %s");
308   }
309
310
311   /* Don't connect asap to leave time to other side to enter the accept() */
312   TRY {
313     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
314                                      request->peer.port,
315                                      request->buf_size,1);
316   } CATCH(e) {
317     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
318              gras_socket_peer_name(expeditor),request->peer.port);
319     /* FIXME: tell error to remote */
320   }
321
322   TRY {
323     measIn = gras_socket_meas_accept(measMasterIn);
324     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
325            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
326   } CATCH(e) {
327     gras_socket_close(measMasterIn);
328     gras_socket_close(measIn);
329     gras_socket_close(measOut);
330     /* FIXME: tell error to remote ? */
331     RETHROW0("Error encountered while opening the meas socket: %s");
332   }
333
334   if (!msgtwaited) {
335     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
336     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
337     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
338   }
339
340   while (tooshort) {
341     void *payload;
342     int msggot;
343     TRY {
344       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
345       gras_socket_meas_send(measOut,120,1,1);
346     } CATCH(e) {
347       gras_socket_close(measMasterIn);
348       gras_socket_close(measIn);
349       gras_socket_close(measOut);
350       /* FIXME: tell error to remote ? */
351       RETHROW0("Error encountered while receiving the experiment: %s");
352     }
353     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
354     switch(msggot) {
355     case 0: /* BW stop */
356       tooshort = 0;
357       break;
358     case 1: /* BW reask */
359       tooshort = 1;
360       free(request);
361       request = (bw_request_t)payload;
362       VERB0("Return the reasking RPC");
363       gras_msg_rpcreturn(60,ctx_reask,NULL);
364     }
365     gras_msg_cb_ctx_free(ctx_reask);
366   }
367
368   if (measIn != measMasterIn)
369     gras_socket_close(measMasterIn);
370   gras_socket_close(measIn);
371   gras_socket_close(measOut);
372   free(answer);
373   free(request);
374   VERB0("BW experiment done.");
375   return 1;
376 }
377
378 /**
379  * \brief request a bandwidth measurement between two remote peers
380  *
381  * \arg from_name: Name of the first peer 
382  * \arg from_port: port on which the first process is listening for messages
383  * \arg to_name: Name of the second peer 
384  * \arg to_port: port on which the second process is listening (for messages, do not 
385  * give a measurement socket here. The needed measurement sockets will be created 
386  * automatically and negociated between the peers)
387  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
388  * \arg exp_size: Total size of data sent across the network
389  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
390  * \arg sec: where the result (in seconds) should be stored.
391  * \arg bw: observed Bandwidth (in byte/s)
392  *
393  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
394  * This call is blocking until the end of the experiment.
395  *
396  * Results are reported in last args, and sizes are in bytes.
397  */
398 void amok_bw_request(const char* from_name,unsigned int from_port,
399                      const char* to_name,unsigned int to_port,
400                      unsigned long int buf_size,
401                      unsigned long int exp_size,
402                      unsigned long int msg_size,
403                      double min_duration,
404              /*OUT*/ double *sec, double*bw) {
405   
406   gras_socket_t sock;
407   /* The request */
408   bw_request_t request;
409   bw_res_t result;
410   request=xbt_new0(s_bw_request_t,1);
411   request->buf_size=buf_size;
412   request->exp_size=exp_size;
413   request->msg_size=msg_size;
414   request->min_duration = min_duration;
415
416
417   request->peer.name = (char*)to_name;
418   request->peer.port = to_port;
419
420
421   sock = gras_socket_client(from_name,from_port);
422  
423     
424  
425   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
426   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
427
428   if (sec)
429     *sec=result->sec;
430   if (bw)
431     *bw =result->bw;
432
433   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
434         from_name,from_port, to_name,to_port,
435         result->sec,((double)result->bw)/1024.0);
436
437   gras_socket_close(sock);
438   free(result);
439   free(request);
440 }
441
442 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
443                           void            *payload) {
444                           
445   /* specification of the test to run, and our answer */
446   bw_request_t request = *(bw_request_t*)payload;
447   bw_res_t result = xbt_new0(s_bw_res_t,1);
448   gras_socket_t peer,asker;
449
450   asker=gras_msg_cb_ctx_from(ctx);
451   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
452         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
453
454         request->peer.name,request->peer.port);
455   peer = gras_socket_client(request->peer.name,request->peer.port);
456   amok_bw_test(peer,
457                request->buf_size,request->exp_size,request->msg_size,
458                request->min_duration,
459                &(result->sec),&(result->bw));
460  
461   gras_msg_rpcreturn(240,ctx,&result);
462
463   gras_os_sleep(1);
464   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
465   free(request->peer.name);
466   free(request);
467   free(result);
468   
469   return 1;
470 }
471
472 /** \brief builds a matrix of results of bandwidth measurement */
473 double * amok_bw_matrix(xbt_dynar_t peers,
474                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
475                         double min_duration) { 
476   double sec;
477   /* construction of matrices for bandwith and latency */
478
479
480   int i,j,len=xbt_dynar_length(peers);
481
482   double *matrix_res = xbt_new0(double, len*len);
483   xbt_peer_t p1,p2;
484
485   xbt_dynar_foreach (peers,i,p1) {
486     xbt_dynar_foreach (peers,j,p2) {
487       if (i!=j) {
488         /* Mesurements of Bandwidth */
489         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
490                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
491                         &sec,&matrix_res[i*len + j]);
492       } 
493     }
494   }
495   return matrix_res;
496 }