Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Some more conversion to windows decoration cruft
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"msg_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_amount",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg msg_size: Size of each message sent. 
107  * \arg msg_amount: Amount of such messages to exchange 
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched (and others, if needed). msg_size will be multiplicated by
117  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap. In such case, the 
122  * number of messages is increased instead of their size.
123  *
124  * Results are reported in last args, and sizes are in byte.
125  * 
126  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
127  *           as the total amount of data to send and the msg_size. This
128  *           was changed for the fool wanting to send more than MAXINT
129  *           bytes in a fat pipe.
130  * 
131  */
132 void amok_bw_test(gras_socket_t peer,
133                   unsigned long int buf_size,
134                   unsigned long int msg_size,
135                   unsigned long int msg_amount,
136                   double min_duration,
137           /*OUT*/ double *sec, double *bw) {
138
139   /* Measurement sockets for the experiments */
140   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
141   int port;
142   bw_request_t request,request_ack;
143   xbt_ex_t e;
144   int first_pass; 
145   
146   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
147     TRY {
148       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
149     } CATCH(e) {
150       measMasterIn = NULL;
151       if (port == 10000 -1) {
152         RETHROW0("Error caught while opening a measurement socket: %s");
153       } else {
154         xbt_ex_free(e); 
155       }
156     }
157   }
158   
159   request=xbt_new0(s_bw_request_t,1);
160   request->buf_size=buf_size;
161   request->msg_size=msg_size;
162   request->msg_amount=msg_amount;
163   request->peer.name = NULL;
164   request->peer.port = gras_socket_my_port(measMasterIn);
165   DEBUG6("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)", 
166         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
167         request->buf_size,request->msg_size,request->msg_amount);
168
169   TRY {
170     gras_msg_rpccall(peer,15,
171                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
172   } CATCH(e) {
173     RETHROW0("Error encountered while sending the BW request: %s");
174   }
175   measIn = gras_socket_meas_accept(measMasterIn);
176    
177   TRY {
178     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
179                                    request_ack->peer.port, 
180                                    request->buf_size,1);
181   } CATCH(e) {
182     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
183              gras_socket_peer_name(peer),request_ack->peer.port);
184   }
185   DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
186          request->msg_size, request->msg_amount);
187
188   *sec = 0;
189   first_pass = 1;
190   do {
191     if (first_pass == 0) {
192       double meas_duration=*sec;
193       double increase;
194       if (*sec != 0.0 ) {
195          increase = (min_duration / meas_duration) * 1.1;
196       } else {
197          increase = 4; 
198       }
199       /* Do not increase the exp size too fast since our decision would be based on wrong measurements*/
200       if (increase > 20)
201          increase = 20; 
202             
203       request->msg_size = request->msg_size * increase;
204
205       /* Do not do too large experiments messages or the sensors 
206          will start to swap to store one of them.
207          And then increase the number of messages to compensate (check for overflow there, too) */
208       if (request->msg_size > 64*1024*1024) {
209          unsigned long int new_amount = ( (request->msg_size / ((double)64*1024*1024)) 
210                                           * request->msg_amount ) + 1;
211         
212          xbt_assert0(new_amount > request->msg_amount,
213                      "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
214          request->msg_amount = new_amount;
215          
216          request->msg_size = 64*1024*1024;
217       }
218
219       VERB5("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
220             meas_duration, min_duration, 
221             request->msg_size, request->msg_amount,
222             ((double)request->msg_size) * ((double)request->msg_amount / (*sec) /1024.0/1024.0));
223
224       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
225     }
226
227     first_pass = 0;
228     *sec=gras_os_time();
229     TRY {
230       gras_socket_meas_send(measOut,120,request->msg_size,request->msg_amount);
231       DEBUG0("Data sent. Wait ACK");
232       gras_socket_meas_recv(measIn,120,1,1);
233     } CATCH(e) {
234       gras_socket_close(measOut);
235       gras_socket_close(measMasterIn);
236       gras_socket_close(measIn);
237       RETHROW0("Unable to conduct the experiment: %s");
238     }
239     *sec = gras_os_time() - *sec;
240     if (*sec != 0.0) { 
241        *bw = ((double)request->msg_size) * ((double)request->msg_amount) / (*sec);
242     }
243     DEBUG1("Experiment done ; it took %f sec", *sec);
244     if (*sec <= 0) {
245       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
246     }
247
248   } while (*sec < min_duration);
249
250   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
251          *sec,*bw);
252   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
253
254   free(request_ack);
255   free(request);
256   if (measIn != measMasterIn)
257     gras_socket_close(measIn);
258   gras_socket_close(measMasterIn);
259   gras_socket_close(measOut);
260 }
261
262
263 /* Callback to the "BW handshake" message: 
264    opens a server measurement socket,
265    indicate its port in an "BW handshaked" message,
266    receive the corresponding data on the measurement socket, 
267    close the measurement socket
268
269    sizes are in byte
270 */
271 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
272                             void          *payload) {
273   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
274   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
275   bw_request_t request=*(bw_request_t*)payload;
276   bw_request_t answer;
277   xbt_ex_t e;
278   int port;
279   int tooshort = 1;
280   gras_msg_cb_ctx_t ctx_reask;
281   static xbt_dynar_t msgtwaited=NULL;
282   
283   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
284         gras_socket_peer_name(expeditor),request->peer.port,
285         request->buf_size,request->msg_size, request->msg_amount);     
286
287   /* Build our answer */
288   answer = xbt_new0(s_bw_request_t,1);
289   
290   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
291     TRY {
292       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
293     } CATCH(e) {
294       measMasterIn = NULL;
295       if (port < 10000)
296         xbt_ex_free(e);
297       else
298         /* FIXME: tell error to remote */
299         RETHROW0("Error encountered while opening a measurement server socket: %s");
300     }
301   }
302    
303   answer->buf_size=request->buf_size;
304   answer->msg_size=request->msg_size;
305   answer->msg_amount=request->msg_amount;
306   answer->peer.port=gras_socket_my_port(measMasterIn);
307
308   TRY {
309     gras_msg_rpcreturn(60,ctx,&answer);
310   } CATCH(e) { 
311     gras_socket_close(measMasterIn);
312     /* FIXME: tell error to remote */
313     RETHROW0("Error encountered while sending the answer: %s");
314   }
315
316
317   /* Don't connect asap to leave time to other side to enter the accept() */
318   TRY {
319     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
320                                      request->peer.port,
321                                      request->buf_size,1);
322   } CATCH(e) {
323     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
324              gras_socket_peer_name(expeditor),request->peer.port);
325     /* FIXME: tell error to remote */
326   }
327
328   TRY {
329     measIn = gras_socket_meas_accept(measMasterIn);
330     DEBUG4("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
331            answer->buf_size,answer->msg_size,answer->msg_amount, answer->peer.port);
332   } CATCH(e) {
333     gras_socket_close(measMasterIn);
334     gras_socket_close(measIn);
335     gras_socket_close(measOut);
336     /* FIXME: tell error to remote ? */
337     RETHROW0("Error encountered while opening the meas socket: %s");
338   }
339
340   if (!msgtwaited) {
341     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
342     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
343     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
344   }
345
346   while (tooshort) {
347     void *payload;
348     int msggot;
349     TRY {
350       gras_socket_meas_recv(measIn, 120,request->msg_size,request->msg_amount);
351       gras_socket_meas_send(measOut,120,1,1);
352     } CATCH(e) {
353       gras_socket_close(measMasterIn);
354       gras_socket_close(measIn);
355       gras_socket_close(measOut);
356       /* FIXME: tell error to remote ? */
357       RETHROW0("Error encountered while receiving the experiment: %s");
358     }
359     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
360     switch(msggot) {
361     case 0: /* BW stop */
362       tooshort = 0;
363       break;
364     case 1: /* BW reask */
365       tooshort = 1;
366       free(request);
367       request = (bw_request_t)payload;
368       VERB0("Return the reasking RPC");
369       gras_msg_rpcreturn(60,ctx_reask,NULL);
370     }
371     gras_msg_cb_ctx_free(ctx_reask);
372   }
373
374   if (measIn != measMasterIn)
375     gras_socket_close(measMasterIn);
376   gras_socket_close(measIn);
377   gras_socket_close(measOut);
378   free(answer);
379   free(request);
380   VERB0("BW experiment done.");
381   return 1;
382 }
383
384 /**
385  * \brief request a bandwidth measurement between two remote peers
386  *
387  * \arg from_name: Name of the first peer 
388  * \arg from_port: port on which the first process is listening for messages
389  * \arg to_name: Name of the second peer 
390  * \arg to_port: port on which the second process is listening (for messages, do not 
391  * give a measurement socket here. The needed measurement sockets will be created 
392  * automatically and negociated between the peers)
393  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
394  * \arg msg_size: Size of each message sent. 
395  * \arg msg_amount: Amount of such data to exchange
396  * \arg sec: where the result (in seconds) should be stored.
397  * \arg bw: observed Bandwidth (in byte/s)
398  *
399  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
400  * This call is blocking until the end of the experiment.
401  *
402  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
403  *           as the total amount of data to send and the msg_size. This
404  *           was changed for the fool wanting to send more than MAXINT
405  *           bytes in a fat pipe.
406  * 
407  * Results are reported in last args, and sizes are in bytes.
408  */
409 void amok_bw_request(const char* from_name,unsigned int from_port,
410                      const char* to_name,unsigned int to_port,
411                      unsigned long int buf_size,
412                      unsigned long int msg_size,
413                      unsigned long int msg_amount,
414                      double min_duration,
415              /*OUT*/ double *sec, double*bw) {
416   
417   gras_socket_t sock;
418   /* The request */
419   bw_request_t request;
420   bw_res_t result;
421   request=xbt_new0(s_bw_request_t,1);
422   request->buf_size=buf_size;
423   request->msg_size=msg_size;
424   request->msg_amount=msg_amount;
425   request->min_duration = min_duration;
426
427
428   request->peer.name = (char*)to_name;
429   request->peer.port = to_port;
430
431
432   sock = gras_socket_client(from_name,from_port);
433  
434     
435  
436   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
437   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
438
439   if (sec)
440     *sec=result->sec;
441   if (bw)
442     *bw =result->bw;
443
444   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
445         from_name,from_port, to_name,to_port,
446         result->sec,((double)result->bw)/1024.0);
447
448   gras_socket_close(sock);
449   free(result);
450   free(request);
451 }
452
453 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
454                           void            *payload) {
455                           
456   /* specification of the test to run, and our answer */
457   bw_request_t request = *(bw_request_t*)payload;
458   bw_res_t result = xbt_new0(s_bw_res_t,1);
459   gras_socket_t peer,asker;
460
461   asker=gras_msg_cb_ctx_from(ctx);
462   VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
463         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
464
465         request->peer.name,request->peer.port,
466         request->msg_size,request->msg_amount);
467   peer = gras_socket_client(request->peer.name,request->peer.port);
468   amok_bw_test(peer,
469                request->buf_size,request->msg_size,request->msg_amount,
470                request->min_duration,
471                &(result->sec),&(result->bw));
472  
473   gras_msg_rpcreturn(240,ctx,&result);
474
475   gras_os_sleep(1);
476   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
477   free(request->peer.name);
478   free(request);
479   free(result);
480   
481   return 1;
482 }
483
484 /** \brief builds a matrix of results of bandwidth measurement
485  * 
486  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
487  *           as the total amount of data to send and the msg_size. This
488  *           was changed for the fool wanting to send more than MAXINT
489  *           bytes in a fat pipe.
490  */
491 double * amok_bw_matrix(xbt_dynar_t peers,
492                         int buf_size_bw, int msg_size_bw, int msg_amount_bw,
493                         double min_duration) { 
494   double sec;
495   /* construction of matrices for bandwith and latency */
496
497
498   int i,j,len=xbt_dynar_length(peers);
499
500   double *matrix_res = xbt_new0(double, len*len);
501   xbt_peer_t p1,p2;
502
503   xbt_dynar_foreach (peers,i,p1) {
504     xbt_dynar_foreach (peers,j,p2) {
505       if (i!=j) {
506         /* Mesurements of Bandwidth */
507         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
508                         buf_size_bw,msg_size_bw,msg_amount_bw,min_duration,
509                         &sec,&matrix_res[i*len + j]);
510       } 
511     }
512   }
513   return matrix_res;
514 }