Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
a1a8623ab954eca5195d0793c6a00ef4c96cb477
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"msg_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register("BW request",  &amok_bw_cb_bw_request);
90   gras_cb_register("BW handshake",&amok_bw_cb_bw_handshake);
91 }
92 void amok_bw_bw_leave() {
93   gras_cb_unregister("BW request",  &amok_bw_cb_bw_request);
94   gras_cb_unregister("BW handshake",&amok_bw_cb_bw_handshake);
95 }
96
97 /**
98  * \brief bandwidth measurement between localhost and \e peer
99  * 
100  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
101  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
102  * \arg msg_size: Size of each message sent. 
103  * \arg msg_amount: Amount of such messages to exchange 
104  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
105  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
106  * \arg bw: observed Bandwidth (in byte/s) 
107  *
108  * Conduct a bandwidth test from the local process to the given peer.
109  * This call is blocking until the end of the experiment.
110  *
111  * If the asked experiment lasts less than \a min_duration, another one will be
112  * launched (and others, if needed). msg_size will be multiplicated by
113  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
114  * reach the \a min_duration). In that case, the reported bandwidth and
115  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
116  * because we need to malloc a block of this size in RL to conduct the
117  * experiment, and we still don't want to visit the swap. In such case, the 
118  * number of messages is increased instead of their size.
119  *
120  * Results are reported in last args, and sizes are in byte.
121  * 
122  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
123  *           as the total amount of data to send and the msg_size. This
124  *           was changed for the fool wanting to send more than MAXINT
125  *           bytes in a fat pipe.
126  * 
127  */
128 void amok_bw_test(gras_socket_t peer,
129                   unsigned long int buf_size,
130                   unsigned long int msg_size,
131                   unsigned long int msg_amount,
132                   double min_duration,
133           /*OUT*/ double *sec, double *bw) {
134
135   /* Measurement sockets for the experiments */
136   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
137   int port;
138   bw_request_t request,request_ack;
139   xbt_ex_t e;
140   int first_pass; 
141   
142   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
143     TRY {
144       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
145     } CATCH(e) {
146       measMasterIn = NULL;
147       if (port == 10000 -1) {
148         RETHROW0("Error caught while opening a measurement socket: %s");
149       } else {
150         xbt_ex_free(e); 
151       }
152     }
153   }
154   
155   request=xbt_new0(s_bw_request_t,1);
156   request->buf_size=buf_size;
157   request->msg_size=msg_size;
158   request->msg_amount=msg_amount;
159   request->peer.name = NULL;
160   request->peer.port = gras_socket_my_port(measMasterIn);
161   DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)", 
162         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
163         request->buf_size,request->msg_size,request->msg_amount);
164
165   TRY {
166     gras_msg_rpccall(peer,15,"BW handshake",&request, &request_ack);
167   } CATCH(e) {
168     RETHROW0("Error encountered while sending the BW request: %s");
169   }
170   measIn = gras_socket_meas_accept(measMasterIn);
171    
172   TRY {
173     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
174                                    request_ack->peer.port, 
175                                    request->buf_size,1);
176   } CATCH(e) {
177     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
178              gras_socket_peer_name(peer),request_ack->peer.port);
179   }
180   DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
181          request->msg_size, request->msg_amount);
182
183   *sec = 0;
184   first_pass = 1;
185   do {
186     if (first_pass == 0) {
187       double meas_duration=*sec;
188       double increase;
189       if (*sec != 0.0 ) {
190          increase = (min_duration / meas_duration) * 1.1;
191       } else {
192          increase = 4; 
193       }
194       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
195       if (increase > 20)
196          increase = 20; 
197             
198       request->msg_size = request->msg_size * increase;
199
200       /* Do not do too large experiments messages or the sensors 
201          will start to swap to store one of them.
202          And then increase the number of messages to compensate (check for overflow there, too) */
203       if (request->msg_size > 64*1024*1024) {
204          unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024)) 
205                                           * request->msg_amount ) + 1;
206         
207          xbt_assert0(new_amount > request->msg_amount,
208                      "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
209          request->msg_amount = new_amount;
210          
211          request->msg_size = 64*1024*1024;
212       }
213
214       VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
215             meas_duration, min_duration, 
216             request->msg_size, request->msg_amount,
217             ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
218
219       gras_msg_rpccall(peer, 60, "BW reask",&request, NULL);      
220     }
221
222     first_pass = 0;
223     *sec=gras_os_time();
224     TRY {
225       gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
226       DEBUG0("Data sent. Wait ACK");
227       gras_socket_meas_recv(measIn,120,1,1);
228     } CATCH(e) {
229       gras_socket_close(measOut);
230       gras_socket_close(measMasterIn);
231       gras_socket_close(measIn);
232       RETHROW0("Unable to conduct the experiment: %s");
233     }
234     *sec = gras_os_time() - *sec;
235     if (*sec != 0.0) { 
236        *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
237     }
238     DEBUG1("Experiment done ; it took %f sec", *sec);
239     if (*sec <= 0) {
240       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
241     }
242
243   } while (*sec < min_duration);
244
245   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
246          *sec,*bw);
247   gras_msg_send(peer, "BW stop", NULL);      
248
249   free(request_ack);
250   free(request);
251   if (measIn != measMasterIn)
252     gras_socket_close(measIn);
253   gras_socket_close(measMasterIn);
254   gras_socket_close(measOut);
255 }
256
257
258 /* Callback to the "BW handshake" message: 
259    opens a server measurement socket,
260    indicate its port in an "BW handshaked" message,
261    receive the corresponding data on the measurement socket, 
262    close the measurement socket
263
264    sizes are in byte
265 */
266 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
267                             void          *payload) {
268   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
269   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
270   bw_request_t request=*(bw_request_t*)payload;
271   bw_request_t answer;
272   xbt_ex_t e;
273   int port;
274   int tooshort = 1;
275   gras_msg_cb_ctx_t ctx_reask;
276   static xbt_dynar_t msgtwaited=NULL;
277   
278   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
279         gras_socket_peer_name(expeditor),request->peer.port,
280         request->buf_size,request->msg_size, request->msg_amount);     
281
282   /* Build our answer */
283   answer = xbt_new0(s_bw_request_t,1);
284   
285   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
286     TRY {
287       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
288     } CATCH(e) {
289       measMasterIn = NULL;
290       if (port < 10000)
291         xbt_ex_free(e);
292       else
293         /* FIXME: tell error to remote */
294         RETHROW0("Error encountered while opening a measurement server socket: %s");
295     }
296   }
297    
298   answer->buf_size=request->buf_size;
299   answer->msg_size=request->msg_size;
300   answer->msg_amount=request->msg_amount;
301   answer->peer.port=gras_socket_my_port(measMasterIn);
302
303   TRY {
304     gras_msg_rpcreturn(60,ctx,&answer);
305   } CATCH(e) { 
306     gras_socket_close(measMasterIn);
307     /* FIXME: tell error to remote */
308     RETHROW0("Error encountered while sending the answer: %s");
309   }
310
311
312   /* Don't connect asap to leave time to other side to enter the accept() */
313   TRY {
314     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
315                                      request->peer.port,
316                                      request->buf_size,1);
317   } CATCH(e) {
318     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
319              gras_socket_peer_name(expeditor),request->peer.port);
320     /* FIXME: tell error to remote */
321   }
322
323   TRY {
324     measIn = gras_socket_meas_accept(measMasterIn);
325     DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
326            answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
327   } CATCH(e) {
328     gras_socket_close(measMasterIn);
329     gras_socket_close(measIn);
330     gras_socket_close(measOut);
331     /* FIXME: tell error to remote ? */
332     RETHROW0("Error encountered while opening the meas socket: %s");
333   }
334
335   if (!msgtwaited) {
336     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
337     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
338     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
339   }
340
341   while (tooshort) {
342     void *payload;
343     int msggot;
344     TRY {
345       gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
346       gras_socket_meas_send(measOut,120,1,1);
347     } CATCH(e) {
348       gras_socket_close(measMasterIn);
349       gras_socket_close(measIn);
350       gras_socket_close(measOut);
351       /* FIXME: tell error to remote ? */
352       RETHROW0("Error encountered while receiving the experiment: %s");
353     }
354     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
355     switch(msggot) {
356     case 0: /* BW stop */
357       tooshort = 0;
358       break;
359     case 1: /* BW reask */
360       tooshort = 1;
361       free(request);
362       request = (bw_request_t)payload;
363       VERB0("Return the reasking RPC");
364       gras_msg_rpcreturn(60,ctx_reask,NULL);
365     }
366     gras_msg_cb_ctx_free(ctx_reask);
367   }
368
369   if (measIn != measMasterIn)
370     gras_socket_close(measMasterIn);
371   gras_socket_close(measIn);
372   gras_socket_close(measOut);
373   free(answer);
374   free(request);
375   VERB0("BW experiment done.");
376   return 0;
377 }
378
379 /**
380  * \brief request a bandwidth measurement between two remote peers
381  *
382  * \arg from_name: Name of the first peer 
383  * \arg from_port: port on which the first process is listening for messages
384  * \arg to_name: Name of the second peer 
385  * \arg to_port: port on which the second process is listening (for messages, do not 
386  * give a measurement socket here. The needed measurement sockets will be created 
387  * automatically and negociated between the peers)
388  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
389  * \arg msg_size: Size of each message sent. 
390  * \arg msg_amount: Amount of such data to exchange
391  * \arg sec: where the result (in seconds) should be stored.
392  * \arg bw: observed Bandwidth (in byte/s)
393  *
394  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
395  * This call is blocking until the end of the experiment.
396  *
397  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
398  *           as the total amount of data to send and the msg_size. This
399  *           was changed for the fool wanting to send more than MAXINT
400  *           bytes in a fat pipe.
401  * 
402  * Results are reported in last args, and sizes are in bytes.
403  */
404 void amok_bw_request(const char* from_name,unsigned int from_port,
405                      const char* to_name,unsigned int to_port,
406                      unsigned long int buf_size,
407                      unsigned long int msg_size,
408                      unsigned long int msg_amount,
409                      double min_duration,
410              /*OUT*/ double *sec, double*bw) {
411   
412   gras_socket_t sock;
413   /* The request */
414   bw_request_t request;
415   bw_res_t result;
416   request=xbt_new0(s_bw_request_t,1);
417   request->buf_size=buf_size;
418   request->msg_size=msg_size;
419   request->msg_amount=msg_amount;
420   request->min_duration = min_duration;
421
422
423   request->peer.name = (char*)to_name;
424   request->peer.port = to_port;
425
426
427   sock = gras_socket_client(from_name,from_port);
428  
429     
430  
431   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
432   gras_msg_rpccall(sock,20*60,"BW request", &request, &result);
433
434   if (sec)
435     *sec=result->sec;
436   if (bw)
437     *bw =result->bw;
438
439   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
440         from_name,from_port, to_name,to_port,
441         result->sec,((double)result->bw)/1024.0);
442
443   gras_socket_close(sock);
444   free(result);
445   free(request);
446 }
447
448 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
449                           void            *payload) {
450                           
451   /* specification of the test to run, and our answer */
452   bw_request_t request = *(bw_request_t*)payload;
453   bw_res_t result = xbt_new0(s_bw_res_t,1);
454   gras_socket_t peer,asker;
455
456   asker=gras_msg_cb_ctx_from(ctx);
457   VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
458         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
459
460         request->peer.name,request->peer.port,
461         request->msg_size,request->msg_amount);
462   peer = gras_socket_client(request->peer.name,request->peer.port);
463   amok_bw_test(peer,
464                request->buf_size,request->msg_size,request->msg_amount,
465                request->min_duration,
466                &(result->sec),&(result->bw));
467  
468   gras_msg_rpcreturn(240,ctx,&result);
469
470   gras_os_sleep(1);
471   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
472   free(request->peer.name);
473   free(request);
474   free(result);
475   
476   return 0;
477 }
478
479 /** \brief builds a matrix of results of bandwidth measurement
480  * 
481  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
482  *           as the total amount of data to send and the msg_size. This
483  *           was changed for the fool wanting to send more than MAXINT
484  *           bytes in a fat pipe.
485  */
486 double * amok_bw_matrix(xbt_dynar_t peers,
487                         int buf_size_bw, int msg_size_bw, int msg_amount_bw,
488                         double min_duration) { 
489   double sec;
490   /* construction of matrices for bandwith and latency */
491
492
493   int i,j,len=xbt_dynar_length(peers);
494
495   double *matrix_res = xbt_new0(double, len*len);
496   xbt_peer_t p1,p2;
497
498   xbt_dynar_foreach (peers,i,p1) {
499     xbt_dynar_foreach (peers,j,p2) {
500       if (i!=j) {
501         /* Mesurements of Bandwidth */
502         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
503                         buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
504                         &sec,&matrix_res[i*len + j]);
505       } 
506     }
507   }
508   return matrix_res;
509 }