Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
6811ee0e49407e9038ab76454550c63c6fe30b25
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   int first_pass; 
138   int nb_messages = (exp_size % msg_size == 0) ? 
139     (exp_size / msg_size) : (exp_size / msg_size + 1); 
140   
141   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
142     TRY {
143       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
144     } CATCH(e) {
145       measMasterIn = NULL;
146       if (port == 10000 -1) {
147         RETHROW0("Error caught while opening a measurement socket: %s");
148       } else {
149         xbt_ex_free(e); 
150       }
151     }
152   }
153   
154   request=xbt_new0(s_bw_request_t,1);
155   request->buf_size=buf_size;
156   request->exp_size=msg_size * nb_messages;
157   request->msg_size=msg_size;
158   request->peer.name = NULL;
159   request->peer.port = gras_socket_my_port(measMasterIn);
160   DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
161         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
162         buf_size,request->buf_size);
163
164   TRY {
165     gras_msg_rpccall(peer,15,
166                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while sending the BW request: %s");
169   }
170   measIn = gras_socket_meas_accept(measMasterIn);
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->peer.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->peer.port);
179   }
180   DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
181          request->exp_size, request->msg_size);
182
183   *sec = 0;
184   first_pass = 1;
185   do {
186     if (first_pass == 0) {
187       double meas_duration=*sec;
188       double increase;
189       if (*sec != 0.0 ) {
190          increase = (min_duration / meas_duration) * 1.1;
191       } else {
192          increase = 4; 
193       }
194       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
195       if (increase > 20)
196          increase = 20; 
197             
198       request->msg_size = request->msg_size * increase;
199
200       /* Do not do too large experiments messages or the sensors 
201          will start to swap to store one of them.
202          And then increase the number of messages to compensate */
203       if (request->msg_size > 64*1024*1024) {
204         nb_messages = ( (request->msg_size / ((double)64*1024*1024)) 
205                         * nb_messages ) + 1; 
206         request->msg_size = 64*1024*1024;
207       }
208
209       VERB6("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%lu msg_size=%lu (nb_messages=%d) (got %fkb/s)",
210             meas_duration, min_duration, 
211             request->exp_size, request->msg_size, nb_messages, 
212             ((double)request->exp_size) / *sec/1024);
213
214       xbt_assert0(request->exp_size > request->msg_size * nb_messages,
215                   "Overflow on the experiment size! You must have a *really* fat pipe. Please fix your platform");
216       request->exp_size = request->msg_size * nb_messages;
217
218
219       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
220     }
221
222     first_pass = 0;
223     *sec=gras_os_time();
224     TRY {
225       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
226       DEBUG0("Data sent. Wait ACK");
227       gras_socket_meas_recv(measIn,120,1,1);
228     } CATCH(e) {
229       gras_socket_close(measOut);
230       gras_socket_close(measMasterIn);
231       gras_socket_close(measIn);
232       RETHROW0("Unable to conduct the experiment: %s");
233     }
234     *sec = gras_os_time() - *sec;
235     if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
236     DEBUG1("Experiment done ; it took %f sec", *sec);
237     if (*sec <= 0) {
238       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
239     }
240
241   } while (*sec < min_duration);
242
243   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
244          *sec,*bw);
245   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
246
247   free(request_ack);
248   free(request);
249   if (measIn != measMasterIn)
250     gras_socket_close(measIn);
251   gras_socket_close(measMasterIn);
252   gras_socket_close(measOut);
253 }
254
255
256 /* Callback to the "BW handshake" message: 
257    opens a server measurement socket,
258    indicate its port in an "BW handshaked" message,
259    receive the corresponding data on the measurement socket, 
260    close the measurment socket
261
262    sizes are in byte
263 */
264 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
265                             void          *payload) {
266   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
267   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
268   bw_request_t request=*(bw_request_t*)payload;
269   bw_request_t answer;
270   xbt_ex_t e;
271   int port;
272   int tooshort = 1;
273   gras_msg_cb_ctx_t ctx_reask;
274   static xbt_dynar_t msgtwaited=NULL;
275   
276   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
277         gras_socket_peer_name(expeditor),request->peer.port,
278         request->buf_size,request->exp_size,request->msg_size);     
279
280   /* Build our answer */
281   answer = xbt_new0(s_bw_request_t,1);
282   
283   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
284     TRY {
285       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
286     } CATCH(e) {
287       measMasterIn = NULL;
288       if (port < 10000)
289         xbt_ex_free(e);
290       else
291         /* FIXME: tell error to remote */
292         RETHROW0("Error encountered while opening a measurement server socket: %s");
293     }
294   }
295    
296   answer->buf_size=request->buf_size;
297   answer->exp_size=request->exp_size;
298   answer->msg_size=request->msg_size;
299   answer->peer.port=gras_socket_my_port(measMasterIn);
300
301   TRY {
302     gras_msg_rpcreturn(60,ctx,&answer);
303   } CATCH(e) { 
304     gras_socket_close(measMasterIn);
305     /* FIXME: tell error to remote */
306     RETHROW0("Error encountered while sending the answer: %s");
307   }
308
309
310   /* Don't connect asap to leave time to other side to enter the accept() */
311   TRY {
312     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
313                                      request->peer.port,
314                                      request->buf_size,1);
315   } CATCH(e) {
316     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
317              gras_socket_peer_name(expeditor),request->peer.port);
318     /* FIXME: tell error to remote */
319   }
320
321   TRY {
322     measIn = gras_socket_meas_accept(measMasterIn);
323     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
324            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
325   } CATCH(e) {
326     gras_socket_close(measMasterIn);
327     gras_socket_close(measIn);
328     gras_socket_close(measOut);
329     /* FIXME: tell error to remote ? */
330     RETHROW0("Error encountered while opening the meas socket: %s");
331   }
332
333   if (!msgtwaited) {
334     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
335     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
336     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
337   }
338
339   while (tooshort) {
340     void *payload;
341     int msggot;
342     TRY {
343       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
344       gras_socket_meas_send(measOut,120,1,1);
345     } CATCH(e) {
346       gras_socket_close(measMasterIn);
347       gras_socket_close(measIn);
348       gras_socket_close(measOut);
349       /* FIXME: tell error to remote ? */
350       RETHROW0("Error encountered while receiving the experiment: %s");
351     }
352     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
353     switch(msggot) {
354     case 0: /* BW stop */
355       tooshort = 0;
356       break;
357     case 1: /* BW reask */
358       tooshort = 1;
359       free(request);
360       request = (bw_request_t)payload;
361       VERB0("Return the reasking RPC");
362       gras_msg_rpcreturn(60,ctx_reask,NULL);
363     }
364     gras_msg_cb_ctx_free(ctx_reask);
365   }
366
367   if (measIn != measMasterIn)
368     gras_socket_close(measMasterIn);
369   gras_socket_close(measIn);
370   gras_socket_close(measOut);
371   free(answer);
372   free(request);
373   VERB0("BW experiment done.");
374   return 1;
375 }
376
377 /**
378  * \brief request a bandwidth measurement between two remote peers
379  *
380  * \arg from_name: Name of the first peer 
381  * \arg from_port: port on which the first process is listening for messages
382  * \arg to_name: Name of the second peer 
383  * \arg to_port: port on which the second process is listening (for messages, do not 
384  * give a measurement socket here. The needed measurement sockets will be created 
385  * automatically and negociated between the peers)
386  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
387  * \arg exp_size: Total size of data sent across the network
388  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
389  * \arg sec: where the result (in seconds) should be stored.
390  * \arg bw: observed Bandwidth (in byte/s)
391  *
392  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
393  * This call is blocking until the end of the experiment.
394  *
395  * Results are reported in last args, and sizes are in bytes.
396  */
397 void amok_bw_request(const char* from_name,unsigned int from_port,
398                      const char* to_name,unsigned int to_port,
399                      unsigned long int buf_size,
400                      unsigned long int exp_size,
401                      unsigned long int msg_size,
402                      double min_duration,
403              /*OUT*/ double *sec, double*bw) {
404   
405   gras_socket_t sock;
406   /* The request */
407   bw_request_t request;
408   bw_res_t result;
409   request=xbt_new0(s_bw_request_t,1);
410   request->buf_size=buf_size;
411   request->exp_size=exp_size;
412   request->msg_size=msg_size;
413   request->min_duration = min_duration;
414
415
416   request->peer.name = (char*)to_name;
417   request->peer.port = to_port;
418
419
420   sock = gras_socket_client(from_name,from_port);
421  
422     
423  
424   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
425   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
426
427   if (sec)
428     *sec=result->sec;
429   if (bw)
430     *bw =result->bw;
431
432   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
433         from_name,from_port, to_name,to_port,
434         result->sec,((double)result->bw)/1024.0);
435
436   gras_socket_close(sock);
437   free(result);
438   free(request);
439 }
440
441 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
442                           void            *payload) {
443                           
444   /* specification of the test to run, and our answer */
445   bw_request_t request = *(bw_request_t*)payload;
446   bw_res_t result = xbt_new0(s_bw_res_t,1);
447   gras_socket_t peer,asker;
448
449   asker=gras_msg_cb_ctx_from(ctx);
450   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
451         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
452
453         request->peer.name,request->peer.port);
454   peer = gras_socket_client(request->peer.name,request->peer.port);
455   amok_bw_test(peer,
456                request->buf_size,request->exp_size,request->msg_size,
457                request->min_duration,
458                &(result->sec),&(result->bw));
459  
460   gras_msg_rpcreturn(240,ctx,&result);
461
462   gras_os_sleep(1);
463   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
464   free(request->peer.name);
465   free(request);
466   free(result);
467   
468   return 1;
469 }
470
471 /** \brief builds a matrix of results of bandwidth measurement */
472 double * amok_bw_matrix(xbt_dynar_t peers,
473                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
474                         double min_duration) { 
475   double sec;
476   /* construction of matrices for bandwith and latency */
477
478
479   int i,j,len=xbt_dynar_length(peers);
480
481   double *matrix_res = xbt_new0(double, len*len);
482   xbt_peer_t p1,p2;
483
484   xbt_dynar_foreach (peers,i,p1) {
485     xbt_dynar_foreach (peers,j,p2) {
486       if (i!=j) {
487         /* Mesurements of Bandwidth */
488         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
489                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
490                         &sec,&matrix_res[i*len + j]);
491       } 
492     }
493   }
494   return matrix_res;
495 }