Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First wave of GRAS API breaking: gras_cb_register wants a message name (char*) as...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"msg_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register("BW request",  &amok_bw_cb_bw_request);
90   gras_cb_register("BW handshake",&amok_bw_cb_bw_handshake);
91 }
92 void amok_bw_bw_leave() {
93   gras_cb_unregister("BW request",  &amok_bw_cb_bw_request);
94   gras_cb_unregister("BW handshake",&amok_bw_cb_bw_handshake);
95 }
96
97 /**
98  * \brief bandwidth measurement between localhost and \e peer
99  * 
100  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
101  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
102  * \arg msg_size: Size of each message sent. 
103  * \arg msg_amount: Amount of such messages to exchange 
104  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
105  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
106  * \arg bw: observed Bandwidth (in byte/s) 
107  *
108  * Conduct a bandwidth test from the local process to the given peer.
109  * This call is blocking until the end of the experiment.
110  *
111  * If the asked experiment lasts less than \a min_duration, another one will be
112  * launched (and others, if needed). msg_size will be multiplicated by
113  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
114  * reach the \a min_duration). In that case, the reported bandwidth and
115  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
116  * because we need to malloc a block of this size in RL to conduct the
117  * experiment, and we still don't want to visit the swap. In such case, the 
118  * number of messages is increased instead of their size.
119  *
120  * Results are reported in last args, and sizes are in byte.
121  * 
122  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
123  *           as the total amount of data to send and the msg_size. This
124  *           was changed for the fool wanting to send more than MAXINT
125  *           bytes in a fat pipe.
126  * 
127  */
128 void amok_bw_test(gras_socket_t peer,
129                   unsigned long int buf_size,
130                   unsigned long int msg_size,
131                   unsigned long int msg_amount,
132                   double min_duration,
133           /*OUT*/ double *sec, double *bw) {
134
135   /* Measurement sockets for the experiments */
136   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
137   int port;
138   bw_request_t request,request_ack;
139   xbt_ex_t e;
140   int first_pass; 
141   
142   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
143     TRY {
144       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
145     } CATCH(e) {
146       measMasterIn = NULL;
147       if (port == 10000 -1) {
148         RETHROW0("Error caught while opening a measurement socket: %s");
149       } else {
150         xbt_ex_free(e); 
151       }
152     }
153   }
154   
155   request=xbt_new0(s_bw_request_t,1);
156   request->buf_size=buf_size;
157   request->msg_size=msg_size;
158   request->msg_amount=msg_amount;
159   request->peer.name = NULL;
160   request->peer.port = gras_socket_my_port(measMasterIn);
161   DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)", 
162         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
163         request->buf_size,request->msg_size,request->msg_amount);
164
165   TRY {
166     gras_msg_rpccall(peer,15,
167                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
168   } CATCH(e) {
169     RETHROW0("Error encountered while sending the BW request: %s");
170   }
171   measIn = gras_socket_meas_accept(measMasterIn);
172    
173   TRY {
174     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
175                                    request_ack->peer.port, 
176                                    request->buf_size,1);
177   } CATCH(e) {
178     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
179              gras_socket_peer_name(peer),request_ack->peer.port);
180   }
181   DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
182          request->msg_size, request->msg_amount);
183
184   *sec = 0;
185   first_pass = 1;
186   do {
187     if (first_pass == 0) {
188       double meas_duration=*sec;
189       double increase;
190       if (*sec != 0.0 ) {
191          increase = (min_duration / meas_duration) * 1.1;
192       } else {
193          increase = 4; 
194       }
195       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
196       if (increase > 20)
197          increase = 20; 
198             
199       request->msg_size = request->msg_size * increase;
200
201       /* Do not do too large experiments messages or the sensors 
202          will start to swap to store one of them.
203          And then increase the number of messages to compensate (check for overflow there, too) */
204       if (request->msg_size > 64*1024*1024) {
205          unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024)) 
206                                           * request->msg_amount ) + 1;
207         
208          xbt_assert0(new_amount > request->msg_amount,
209                      "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
210          request->msg_amount = new_amount;
211          
212          request->msg_size = 64*1024*1024;
213       }
214
215       VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
216             meas_duration, min_duration, 
217             request->msg_size, request->msg_amount,
218             ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
219
220       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
221     }
222
223     first_pass = 0;
224     *sec=gras_os_time();
225     TRY {
226       gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
227       DEBUG0("Data sent. Wait ACK");
228       gras_socket_meas_recv(measIn,120,1,1);
229     } CATCH(e) {
230       gras_socket_close(measOut);
231       gras_socket_close(measMasterIn);
232       gras_socket_close(measIn);
233       RETHROW0("Unable to conduct the experiment: %s");
234     }
235     *sec = gras_os_time() - *sec;
236     if (*sec != 0.0) { 
237        *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
238     }
239     DEBUG1("Experiment done ; it took %f sec", *sec);
240     if (*sec <= 0) {
241       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
242     }
243
244   } while (*sec < min_duration);
245
246   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
247          *sec,*bw);
248   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
249
250   free(request_ack);
251   free(request);
252   if (measIn != measMasterIn)
253     gras_socket_close(measIn);
254   gras_socket_close(measMasterIn);
255   gras_socket_close(measOut);
256 }
257
258
259 /* Callback to the "BW handshake" message: 
260    opens a server measurement socket,
261    indicate its port in an "BW handshaked" message,
262    receive the corresponding data on the measurement socket, 
263    close the measurement socket
264
265    sizes are in byte
266 */
267 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
268                             void          *payload) {
269   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
270   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
271   bw_request_t request=*(bw_request_t*)payload;
272   bw_request_t answer;
273   xbt_ex_t e;
274   int port;
275   int tooshort = 1;
276   gras_msg_cb_ctx_t ctx_reask;
277   static xbt_dynar_t msgtwaited=NULL;
278   
279   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
280         gras_socket_peer_name(expeditor),request->peer.port,
281         request->buf_size,request->msg_size, request->msg_amount);     
282
283   /* Build our answer */
284   answer = xbt_new0(s_bw_request_t,1);
285   
286   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
287     TRY {
288       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
289     } CATCH(e) {
290       measMasterIn = NULL;
291       if (port < 10000)
292         xbt_ex_free(e);
293       else
294         /* FIXME: tell error to remote */
295         RETHROW0("Error encountered while opening a measurement server socket: %s");
296     }
297   }
298    
299   answer->buf_size=request->buf_size;
300   answer->msg_size=request->msg_size;
301   answer->msg_amount=request->msg_amount;
302   answer->peer.port=gras_socket_my_port(measMasterIn);
303
304   TRY {
305     gras_msg_rpcreturn(60,ctx,&answer);
306   } CATCH(e) { 
307     gras_socket_close(measMasterIn);
308     /* FIXME: tell error to remote */
309     RETHROW0("Error encountered while sending the answer: %s");
310   }
311
312
313   /* Don't connect asap to leave time to other side to enter the accept() */
314   TRY {
315     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
316                                      request->peer.port,
317                                      request->buf_size,1);
318   } CATCH(e) {
319     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
320              gras_socket_peer_name(expeditor),request->peer.port);
321     /* FIXME: tell error to remote */
322   }
323
324   TRY {
325     measIn = gras_socket_meas_accept(measMasterIn);
326     DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
327            answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
328   } CATCH(e) {
329     gras_socket_close(measMasterIn);
330     gras_socket_close(measIn);
331     gras_socket_close(measOut);
332     /* FIXME: tell error to remote ? */
333     RETHROW0("Error encountered while opening the meas socket: %s");
334   }
335
336   if (!msgtwaited) {
337     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
338     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
339     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
340   }
341
342   while (tooshort) {
343     void *payload;
344     int msggot;
345     TRY {
346       gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
347       gras_socket_meas_send(measOut,120,1,1);
348     } CATCH(e) {
349       gras_socket_close(measMasterIn);
350       gras_socket_close(measIn);
351       gras_socket_close(measOut);
352       /* FIXME: tell error to remote ? */
353       RETHROW0("Error encountered while receiving the experiment: %s");
354     }
355     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
356     switch(msggot) {
357     case 0: /* BW stop */
358       tooshort = 0;
359       break;
360     case 1: /* BW reask */
361       tooshort = 1;
362       free(request);
363       request = (bw_request_t)payload;
364       VERB0("Return the reasking RPC");
365       gras_msg_rpcreturn(60,ctx_reask,NULL);
366     }
367     gras_msg_cb_ctx_free(ctx_reask);
368   }
369
370   if (measIn != measMasterIn)
371     gras_socket_close(measMasterIn);
372   gras_socket_close(measIn);
373   gras_socket_close(measOut);
374   free(answer);
375   free(request);
376   VERB0("BW experiment done.");
377   return 1;
378 }
379
380 /**
381  * \brief request a bandwidth measurement between two remote peers
382  *
383  * \arg from_name: Name of the first peer 
384  * \arg from_port: port on which the first process is listening for messages
385  * \arg to_name: Name of the second peer 
386  * \arg to_port: port on which the second process is listening (for messages, do not 
387  * give a measurement socket here. The needed measurement sockets will be created 
388  * automatically and negociated between the peers)
389  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
390  * \arg msg_size: Size of each message sent. 
391  * \arg msg_amount: Amount of such data to exchange
392  * \arg sec: where the result (in seconds) should be stored.
393  * \arg bw: observed Bandwidth (in byte/s)
394  *
395  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
396  * This call is blocking until the end of the experiment.
397  *
398  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
399  *           as the total amount of data to send and the msg_size. This
400  *           was changed for the fool wanting to send more than MAXINT
401  *           bytes in a fat pipe.
402  * 
403  * Results are reported in last args, and sizes are in bytes.
404  */
405 void amok_bw_request(const char* from_name,unsigned int from_port,
406                      const char* to_name,unsigned int to_port,
407                      unsigned long int buf_size,
408                      unsigned long int msg_size,
409                      unsigned long int msg_amount,
410                      double min_duration,
411              /*OUT*/ double *sec, double*bw) {
412   
413   gras_socket_t sock;
414   /* The request */
415   bw_request_t request;
416   bw_res_t result;
417   request=xbt_new0(s_bw_request_t,1);
418   request->buf_size=buf_size;
419   request->msg_size=msg_size;
420   request->msg_amount=msg_amount;
421   request->min_duration = min_duration;
422
423
424   request->peer.name = (char*)to_name;
425   request->peer.port = to_port;
426
427
428   sock = gras_socket_client(from_name,from_port);
429  
430     
431  
432   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
433   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
434
435   if (sec)
436     *sec=result->sec;
437   if (bw)
438     *bw =result->bw;
439
440   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
441         from_name,from_port, to_name,to_port,
442         result->sec,((double)result->bw)/1024.0);
443
444   gras_socket_close(sock);
445   free(result);
446   free(request);
447 }
448
449 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
450                           void            *payload) {
451                           
452   /* specification of the test to run, and our answer */
453   bw_request_t request = *(bw_request_t*)payload;
454   bw_res_t result = xbt_new0(s_bw_res_t,1);
455   gras_socket_t peer,asker;
456
457   asker=gras_msg_cb_ctx_from(ctx);
458   VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
459         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
460
461         request->peer.name,request->peer.port,
462         request->msg_size,request->msg_amount);
463   peer = gras_socket_client(request->peer.name,request->peer.port);
464   amok_bw_test(peer,
465                request->buf_size,request->msg_size,request->msg_amount,
466                request->min_duration,
467                &(result->sec),&(result->bw));
468  
469   gras_msg_rpcreturn(240,ctx,&result);
470
471   gras_os_sleep(1);
472   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
473   free(request->peer.name);
474   free(request);
475   free(result);
476   
477   return 1;
478 }
479
480 /** \brief builds a matrix of results of bandwidth measurement
481  * 
482  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
483  *           as the total amount of data to send and the msg_size. This
484  *           was changed for the fool wanting to send more than MAXINT
485  *           bytes in a fat pipe.
486  */
487 double * amok_bw_matrix(xbt_dynar_t peers,
488                         int buf_size_bw, int msg_size_bw, int msg_amount_bw,
489                         double min_duration) { 
490   double sec;
491   /* construction of matrices for bandwith and latency */
492
493
494   int i,j,len=xbt_dynar_length(peers);
495
496   double *matrix_res = xbt_new0(double, len*len);
497   xbt_peer_t p1,p2;
498
499   xbt_dynar_foreach (peers,i,p1) {
500     xbt_dynar_foreach (peers,j,p2) {
501       if (i!=j) {
502         /* Mesurements of Bandwidth */
503         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
504                         buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
505                         &sec,&matrix_res[i*len + j]);
506       } 
507     }
508   }
509   return matrix_res;
510 }