Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Revert Darina's last changes. This is definitively not the way to go
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   amok_base_init();
29
30   if (! _amok_bw_initialized) {
31     amok_bw_bw_init();
32     amok_bw_sat_init();
33   }
34    
35   amok_bw_bw_join();
36   amok_bw_sat_join();
37
38   _amok_bw_initialized++;
39 }
40
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43   if (! _amok_bw_initialized)
44     return;
45    
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init() {
59   gras_datadesc_type_t bw_request_desc, bw_res_desc;
60
61   /* Build the Bandwidth datatype descriptions */ 
62   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63   gras_datadesc_struct_append(bw_request_desc,"peer",
64                               gras_datadesc_by_name("s_xbt_peer_t"));
65   gras_datadesc_struct_append(bw_request_desc,"buf_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"exp_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"msg_size",
70                               gras_datadesc_by_name("unsigned long int"));
71   gras_datadesc_struct_append(bw_request_desc,"min_duration",
72                               gras_datadesc_by_name("double"));
73   gras_datadesc_struct_close(bw_request_desc);
74   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
75   
76   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80   gras_datadesc_struct_close(bw_res_desc);
81   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
82   
83   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
84
85   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86   gras_msgtype_declare("BW stop", NULL);
87
88   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
89 }
90 void amok_bw_bw_join() {
91   gras_cb_register(gras_msgtype_by_name("BW request"),
92                    &amok_bw_cb_bw_request);
93   gras_cb_register(gras_msgtype_by_name("BW handshake"),
94                    &amok_bw_cb_bw_handshake);
95 }
96 void amok_bw_bw_leave() {
97   gras_cb_unregister(gras_msgtype_by_name("BW request"),
98                      &amok_bw_cb_bw_request);
99   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100                      &amok_bw_cb_bw_handshake);
101 }
102
103 /**
104  * \brief bandwidth measurement between localhost and \e peer
105  * 
106  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108  * \arg exp_size: Total size of data sent across the network
109  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
112  * \arg bw: observed Bandwidth (in byte/s) 
113  *
114  * Conduct a bandwidth test from the local process to the given peer.
115  * This call is blocking until the end of the experiment.
116  *
117  * If the asked experiment lasts less than \a min_duration, another one will be
118  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
119  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
120  * reach the \a min_duration). In that case, the reported bandwidth and
121  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
122  * because we need to malloc a block of this size in RL to conduct the
123  * experiment, and we still don't want to visit the swap.
124  *
125  * Results are reported in last args, and sizes are in byte.
126  */
127 void amok_bw_test(gras_socket_t peer,
128                   unsigned long int buf_size,
129                   unsigned long int exp_size,
130                   unsigned long int msg_size,
131                   double min_duration,
132           /*OUT*/ double *sec, double *bw) {
133
134   /* Measurement sockets for the experiments */
135   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
136   int port;
137   bw_request_t request,request_ack;
138   xbt_ex_t e;
139   
140   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
141     TRY {
142       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
143     } CATCH(e) {
144       measMasterIn = NULL;
145       if (port == 10000 -1) {
146         RETHROW0("Error caught while opening a measurement socket: %s");
147       } else {
148         xbt_ex_free(e); 
149       }
150     }
151   }
152   
153   request=xbt_new0(s_bw_request_t,1);
154   request->buf_size=buf_size;
155   request->exp_size=exp_size;
156   request->msg_size=msg_size;
157   request->peer.name = NULL;
158   request->peer.port = gras_socket_my_port(measMasterIn);
159   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
160         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
161         buf_size,request->buf_size);
162
163   TRY {
164     gras_msg_rpccall(peer,15,
165                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
166   } CATCH(e) {
167     RETHROW0("Error encountered while sending the BW request: %s");
168   }
169   measIn = gras_socket_meas_accept(measMasterIn);
170    
171   TRY {
172     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
173                                    request_ack->peer.port, 
174                                    request->buf_size,1);
175   } CATCH(e) {
176     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
177              gras_socket_peer_name(peer),request_ack->peer.port);
178   }
179   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
180
181   *sec = 0;
182   do {
183     if (*sec>0) {
184       double meas_duration=*sec;
185       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
186       request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
187       if (request->msg_size > 64*1024*1024)
188         request->msg_size = 64*1024*1024;
189
190       VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
191              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
192       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
193     }
194
195     *sec=gras_os_time();
196     TRY {
197       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
198       DEBUG0("Data sent. Wait ACK");
199       gras_socket_meas_recv(measIn,120,1,1);
200     } CATCH(e) {
201       gras_socket_close(measOut);
202       gras_socket_close(measMasterIn);
203       gras_socket_close(measIn);
204       RETHROW0("Unable to conduct the experiment: %s");
205     }
206     DEBUG0("Experiment done");
207
208     *sec = gras_os_time() - *sec;
209     *bw = ((double)request->exp_size) / *sec;
210   } while (*sec < min_duration);
211
212   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
213          *sec,*bw);
214   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
215
216   free(request_ack);
217   free(request);
218   if (measIn != measMasterIn)
219     gras_socket_close(measIn);
220   gras_socket_close(measMasterIn);
221   gras_socket_close(measOut);
222 }
223
224
225 /* Callback to the "BW handshake" message: 
226    opens a server measurement socket,
227    indicate its port in an "BW handshaked" message,
228    receive the corresponding data on the measurement socket, 
229    close the measurment socket
230
231    sizes are in byte
232 */
233 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
234                             void          *payload) {
235   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
236   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
237   bw_request_t request=*(bw_request_t*)payload;
238   bw_request_t answer;
239   xbt_ex_t e;
240   int port;
241   int tooshort = 1;
242   gras_msg_cb_ctx_t ctx_reask;
243   static xbt_dynar_t msgtwaited=NULL;
244   
245   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
246         gras_socket_peer_name(expeditor),request->peer.port,
247         request->buf_size,request->exp_size,request->msg_size);     
248
249   /* Build our answer */
250   answer = xbt_new0(s_bw_request_t,1);
251   
252   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
253     TRY {
254       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
255     } CATCH(e) {
256       measMasterIn = NULL;
257       if (port < 10000)
258         xbt_ex_free(e);
259       else
260         /* FIXME: tell error to remote */
261         RETHROW0("Error encountered while opening a measurement server socket: %s");
262     }
263   }
264    
265   answer->buf_size=request->buf_size;
266   answer->exp_size=request->exp_size;
267   answer->msg_size=request->msg_size;
268   answer->peer.port=gras_socket_my_port(measMasterIn);
269
270   TRY {
271     gras_msg_rpcreturn(60,ctx,&answer);
272   } CATCH(e) { 
273     gras_socket_close(measMasterIn);
274     /* FIXME: tell error to remote */
275     RETHROW0("Error encountered while sending the answer: %s");
276   }
277
278
279   /* Don't connect asap to leave time to other side to enter the accept() */
280   TRY {
281     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
282                                      request->peer.port,
283                                      request->buf_size,1);
284   } CATCH(e) {
285     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
286              gras_socket_peer_name(expeditor),request->peer.port);
287     /* FIXME: tell error to remote */
288   }
289
290   TRY {
291     measIn = gras_socket_meas_accept(measMasterIn);
292     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
293            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
294   } CATCH(e) {
295     gras_socket_close(measMasterIn);
296     gras_socket_close(measIn);
297     gras_socket_close(measOut);
298     /* FIXME: tell error to remote ? */
299     RETHROW0("Error encountered while opening the meas socket: %s");
300   }
301
302   if (!msgtwaited) {
303     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
304     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
305     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
306   }
307
308   while (tooshort) {
309     void *payload;
310     int msggot;
311     TRY {
312       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
313       gras_socket_meas_send(measOut,120,1,1);
314     } CATCH(e) {
315       gras_socket_close(measMasterIn);
316       gras_socket_close(measIn);
317       gras_socket_close(measOut);
318       /* FIXME: tell error to remote ? */
319       RETHROW0("Error encountered while receiving the experiment: %s");
320     }
321     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
322     switch(msggot) {
323     case 0: /* BW stop */
324       tooshort = 0;
325       break;
326     case 1: /* BW reask */
327       tooshort = 1;
328       free(request);
329       request = (bw_request_t)payload;
330       VERB0("Return the reasking RPC");
331       gras_msg_rpcreturn(60,ctx_reask,NULL);
332     }
333     gras_msg_cb_ctx_free(ctx_reask);
334   }
335
336   if (measIn != measMasterIn)
337     gras_socket_close(measMasterIn);
338   gras_socket_close(measIn);
339   gras_socket_close(measOut);
340   free(answer);
341   free(request);
342   VERB0("BW experiment done.");
343   return 1;
344 }
345
346 /**
347  * \brief request a bandwidth measurement between two remote peers
348  *
349  * \arg from_name: Name of the first peer 
350  * \arg from_port: port on which the first process is listening for messages
351  * \arg to_name: Name of the second peer 
352  * \arg to_port: port on which the second process is listening (for messages, do not 
353  * give a measurement socket here. The needed measurement sockets will be created 
354  * automatically and negociated between the peers)
355  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
356  * \arg exp_size: Total size of data sent across the network
357  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
358  * \arg sec: where the result (in seconds) should be stored.
359  * \arg bw: observed Bandwidth (in byte/s)
360  *
361  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
362  * This call is blocking until the end of the experiment.
363  *
364  * Results are reported in last args, and sizes are in bytes.
365  */
366 void amok_bw_request(const char* from_name,unsigned int from_port,
367                      const char* to_name,unsigned int to_port,
368                      unsigned long int buf_size,
369                      unsigned long int exp_size,
370                      unsigned long int msg_size,
371                      double min_duration,
372              /*OUT*/ double *sec, double*bw) {
373   
374   gras_socket_t sock;
375   /* The request */
376   bw_request_t request;
377   bw_res_t result;
378   request=xbt_new0(s_bw_request_t,1);
379   request->buf_size=buf_size;
380   request->exp_size=exp_size;
381   request->msg_size=msg_size;
382   request->min_duration = min_duration;
383
384
385   request->peer.name = (char*)to_name;
386   request->peer.port = to_port;
387
388
389   sock = gras_socket_client(from_name,from_port);
390  
391     
392  
393   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
394   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
395
396   if (sec)
397     *sec=result->sec;
398   if (bw)
399     *bw =result->bw;
400
401   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
402         from_name,from_port, to_name,to_port,
403         result->sec,((double)result->bw)/1024.0);
404
405   gras_socket_close(sock);
406   free(result);
407   free(request);
408 }
409
410 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
411                           void            *payload) {
412                           
413   /* specification of the test to run, and our answer */
414   bw_request_t request = *(bw_request_t*)payload;
415   bw_res_t result = xbt_new0(s_bw_res_t,1);
416   gras_socket_t peer,asker;
417
418   asker=gras_msg_cb_ctx_from(ctx);
419   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
420         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
421
422         request->peer.name,request->peer.port);
423   peer = gras_socket_client(request->peer.name,request->peer.port);
424   amok_bw_test(peer,
425                request->buf_size,request->exp_size,request->msg_size,
426                request->min_duration,
427                &(result->sec),&(result->bw));
428  
429   gras_msg_rpcreturn(240,ctx,&result);
430
431   gras_os_sleep(1);
432   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
433   free(request->peer.name);
434   free(request);
435   free(result);
436   
437   return 1;
438 }
439
440 /** \brief builds a matrix of results of bandwidth measurement */
441 double * amok_bw_matrix(xbt_dynar_t peers,
442                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
443                         double min_duration) { 
444   double sec;
445   /* construction of matrices for bandwith and latency */
446
447
448   int i,j,len=xbt_dynar_length(peers);
449
450   double *matrix_res = xbt_new0(double, len*len);
451   xbt_peer_t p1,p2;
452
453   xbt_dynar_foreach (peers,i,p1) {
454     xbt_dynar_foreach (peers,j,p2) {
455       if (i!=j) {
456         /* Mesurements of Bandwidth */
457         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
458                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
459                         &sec,&matrix_res[i*len + j]);
460       } 
461     }
462   }
463   return matrix_res;
464 }