Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bc56423e41f84c5895658e29e3fb179fe7845047
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
27
28   if (! _amok_bw_initialized) {
29     amok_bw_bw_init();
30     amok_bw_sat_init();
31   }
32    
33   amok_bw_bw_join();
34   amok_bw_sat_join();
35
36   _amok_bw_initialized++;
37 }
38
39 /** @brief module finalization */
40 void amok_bw_exit(void) {
41   if (! _amok_bw_initialized)
42     return;
43    
44   amok_bw_bw_leave();
45   amok_bw_sat_leave();
46
47   _amok_bw_initialized--;
48 }
49
50 /* ***************************************************************************
51  * Bandwidth tests
52  * ***************************************************************************/
53 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
54 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
55
56 void amok_bw_bw_init() {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */ 
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc,"peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc,"buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc,"exp_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc,"msg_size",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc,"min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
73   
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
76   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
77   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
78   gras_datadesc_struct_close(bw_res_desc);
79   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
80   
81   gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
82
83   gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
84   gras_msgtype_declare("BW stop", NULL);
85
86   gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
87 }
88 void amok_bw_bw_join() {
89   gras_cb_register(gras_msgtype_by_name("BW request"),
90                    &amok_bw_cb_bw_request);
91   gras_cb_register(gras_msgtype_by_name("BW handshake"),
92                    &amok_bw_cb_bw_handshake);
93 }
94 void amok_bw_bw_leave() {
95   gras_cb_unregister(gras_msgtype_by_name("BW request"),
96                      &amok_bw_cb_bw_request);
97   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
98                      &amok_bw_cb_bw_handshake);
99 }
100
101 /**
102  * \brief bandwidth measurement between localhost and \e peer
103  * 
104  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
105  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
106  * \arg exp_size: Total size of data sent across the network
107  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
108  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
109  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
110  * \arg bw: observed Bandwidth (in byte/s) 
111  *
112  * Conduct a bandwidth test from the local process to the given peer.
113  * This call is blocking until the end of the experiment.
114  *
115  * If the asked experiment lasts less than \a min_duration, another one will be
116  * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
117  * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
118  * reach the \a min_duration). In that case, the reported bandwidth and
119  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
120  * because we need to malloc a block of this size in RL to conduct the
121  * experiment, and we still don't want to visit the swap.
122  *
123  * Results are reported in last args, and sizes are in byte.
124  */
125 void amok_bw_test(gras_socket_t peer,
126                   unsigned long int buf_size,
127                   unsigned long int exp_size,
128                   unsigned long int msg_size,
129                   double min_duration,
130           /*OUT*/ double *sec, double *bw) {
131
132   /* Measurement sockets for the experiments */
133   gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
134   int port;
135   bw_request_t request,request_ack;
136   xbt_ex_t e;
137   
138   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
139     TRY {
140       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
141     } CATCH(e) {
142       measMasterIn = NULL;
143       if (port == 10000 -1) {
144         RETHROW0("Error caught while opening a measurement socket: %s");
145       } else {
146         xbt_ex_free(e); 
147       }
148     }
149   }
150   
151   request=xbt_new0(s_bw_request_t,1);
152   request->buf_size=buf_size;
153   request->exp_size=exp_size;
154   request->msg_size=msg_size;
155   request->peer.name = NULL;
156   request->peer.port = gras_socket_my_port(measMasterIn);
157   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
158         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
159         buf_size,request->buf_size);
160
161   TRY {
162     gras_msg_rpccall(peer,15,
163                      gras_msgtype_by_name("BW handshake"),&request, &request_ack);
164   } CATCH(e) {
165     RETHROW0("Error encountered while sending the BW request: %s");
166   }
167   measIn = gras_socket_meas_accept(measMasterIn);
168    
169   TRY {
170     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
171                                    request_ack->peer.port, 
172                                    request->buf_size,1);
173   } CATCH(e) {
174     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
175              gras_socket_peer_name(peer),request_ack->peer.port);
176   }
177   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
178
179   *sec = 0;
180   do {
181     if (*sec>0) {
182       double meas_duration=*sec;
183       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
184       request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
185       if (request->msg_size > 64*1024*1024)
186         request->msg_size = 64*1024*1024;
187
188       VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
189              meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
190       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
191     }
192
193     *sec=gras_os_time();
194     TRY {
195       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
196       DEBUG0("Data sent. Wait ACK");
197       gras_socket_meas_recv(measIn,120,1,1);
198     } CATCH(e) {
199       gras_socket_close(measOut);
200       gras_socket_close(measMasterIn);
201       gras_socket_close(measIn);
202       RETHROW0("Unable to conduct the experiment: %s");
203     }
204     DEBUG0("Experiment done");
205
206     *sec = gras_os_time() - *sec;
207     *bw = ((double)request->exp_size) / *sec;
208   } while (*sec < min_duration);
209
210   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
211          *sec,*bw);
212   gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);      
213
214   free(request_ack);
215   free(request);
216   if (measIn != measMasterIn)
217     gras_socket_close(measIn);
218   gras_socket_close(measMasterIn);
219   gras_socket_close(measOut);
220 }
221
222
223 /* Callback to the "BW handshake" message: 
224    opens a server measurement socket,
225    indicate its port in an "BW handshaked" message,
226    receive the corresponding data on the measurement socket, 
227    close the measurment socket
228
229    sizes are in byte
230 */
231 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
232                             void          *payload) {
233   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
234   gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
235   bw_request_t request=*(bw_request_t*)payload;
236   bw_request_t answer;
237   xbt_ex_t e;
238   int port;
239   int tooshort = 1;
240   gras_msg_cb_ctx_t ctx_reask;
241   static xbt_dynar_t msgtwaited=NULL;
242   
243   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
244         gras_socket_peer_name(expeditor),request->peer.port,
245         request->buf_size,request->exp_size,request->msg_size);     
246
247   /* Build our answer */
248   answer = xbt_new0(s_bw_request_t,1);
249   
250   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
251     TRY {
252       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
253     } CATCH(e) {
254       measMasterIn = NULL;
255       if (port < 10000)
256         xbt_ex_free(e);
257       else
258         /* FIXME: tell error to remote */
259         RETHROW0("Error encountered while opening a measurement server socket: %s");
260     }
261   }
262    
263   answer->buf_size=request->buf_size;
264   answer->exp_size=request->exp_size;
265   answer->msg_size=request->msg_size;
266   answer->peer.port=gras_socket_my_port(measMasterIn);
267
268   TRY {
269     gras_msg_rpcreturn(60,ctx,&answer);
270   } CATCH(e) { 
271     gras_socket_close(measMasterIn);
272     /* FIXME: tell error to remote */
273     RETHROW0("Error encountered while sending the answer: %s");
274   }
275
276
277   /* Don't connect asap to leave time to other side to enter the accept() */
278   TRY {
279     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
280                                      request->peer.port,
281                                      request->buf_size,1);
282   } CATCH(e) {
283     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
284              gras_socket_peer_name(expeditor),request->peer.port);
285     /* FIXME: tell error to remote */
286   }
287
288   TRY {
289     measIn = gras_socket_meas_accept(measMasterIn);
290     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
291            answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
292   } CATCH(e) {
293     gras_socket_close(measMasterIn);
294     gras_socket_close(measIn);
295     gras_socket_close(measOut);
296     /* FIXME: tell error to remote ? */
297     RETHROW0("Error encountered while opening the meas socket: %s");
298   }
299
300   if (!msgtwaited) {
301     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
302     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
303     xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
304   }
305
306   while (tooshort) {
307     void *payload;
308     int msggot;
309     TRY {
310       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
311       gras_socket_meas_send(measOut,120,1,1);
312     } CATCH(e) {
313       gras_socket_close(measMasterIn);
314       gras_socket_close(measIn);
315       gras_socket_close(measOut);
316       /* FIXME: tell error to remote ? */
317       RETHROW0("Error encountered while receiving the experiment: %s");
318     }
319     gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
320     switch(msggot) {
321     case 0: /* BW stop */
322       tooshort = 0;
323       break;
324     case 1: /* BW reask */
325       tooshort = 1;
326       free(request);
327       request = (bw_request_t)payload;
328       VERB0("Return the reasking RPC");
329       gras_msg_rpcreturn(60,ctx_reask,NULL);
330     }
331     gras_msg_cb_ctx_free(ctx_reask);
332   }
333
334   if (measIn != measMasterIn)
335     gras_socket_close(measMasterIn);
336   gras_socket_close(measIn);
337   gras_socket_close(measOut);
338   free(answer);
339   free(request);
340   VERB0("BW experiment done.");
341   return 1;
342 }
343
344 /**
345  * \brief request a bandwidth measurement between two remote peers
346  *
347  * \arg from_name: Name of the first peer 
348  * \arg from_port: port on which the first process is listening for messages
349  * \arg to_name: Name of the second peer 
350  * \arg to_port: port on which the second process is listening (for messages, do not 
351  * give a measurement socket here. The needed measurement sockets will be created 
352  * automatically and negociated between the peers)
353  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
354  * \arg exp_size: Total size of data sent across the network
355  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
356  * \arg sec: where the result (in seconds) should be stored.
357  * \arg bw: observed Bandwidth (in byte/s)
358  *
359  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
360  * This call is blocking until the end of the experiment.
361  *
362  * Results are reported in last args, and sizes are in bytes.
363  */
364 void amok_bw_request(const char* from_name,unsigned int from_port,
365                      const char* to_name,unsigned int to_port,
366                      unsigned long int buf_size,
367                      unsigned long int exp_size,
368                      unsigned long int msg_size,
369                      double min_duration,
370              /*OUT*/ double *sec, double*bw) {
371   
372   gras_socket_t sock;
373   /* The request */
374   bw_request_t request;
375   bw_res_t result;
376   request=xbt_new0(s_bw_request_t,1);
377   request->buf_size=buf_size;
378   request->exp_size=exp_size;
379   request->msg_size=msg_size;
380   request->min_duration = min_duration;
381
382
383   request->peer.name = (char*)to_name;
384   request->peer.port = to_port;
385
386
387   sock = gras_socket_client(from_name,from_port);
388  
389     
390  
391   DEBUG4("Ask for a BW test between %s:%d and %s:%d",   from_name,from_port, to_name,to_port);
392   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
393
394   if (sec)
395     *sec=result->sec;
396   if (bw)
397     *bw =result->bw;
398
399   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
400         from_name,from_port, to_name,to_port,
401         result->sec,((double)result->bw)/1024.0);
402
403   gras_socket_close(sock);
404   free(result);
405   free(request);
406 }
407
408 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
409                           void            *payload) {
410                           
411   /* specification of the test to run, and our answer */
412   bw_request_t request = *(bw_request_t*)payload;
413   bw_res_t result = xbt_new0(s_bw_res_t,1);
414   gras_socket_t peer,asker;
415
416   asker=gras_msg_cb_ctx_from(ctx);
417   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d", 
418         gras_socket_peer_name(asker),gras_socket_peer_port(asker),
419
420         request->peer.name,request->peer.port);
421   peer = gras_socket_client(request->peer.name,request->peer.port);
422   amok_bw_test(peer,
423                request->buf_size,request->exp_size,request->msg_size,
424                request->min_duration,
425                &(result->sec),&(result->bw));
426  
427   gras_msg_rpcreturn(240,ctx,&result);
428
429   gras_os_sleep(1);
430   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
431   free(request->peer.name);
432   free(request);
433   free(result);
434   
435   return 1;
436 }
437
438 /** \brief builds a matrix of results of bandwidth measurement */
439 double * amok_bw_matrix(xbt_dynar_t peers,
440                         int buf_size_bw, int exp_size_bw, int msg_size_bw,
441                         double min_duration) { 
442   double sec;
443   /* construction of matrices for bandwith and latency */
444
445
446   int i,j,len=xbt_dynar_length(peers);
447
448   double *matrix_res = xbt_new0(double, len*len);
449   xbt_peer_t p1,p2;
450
451   xbt_dynar_foreach (peers,i,p1) {
452     xbt_dynar_foreach (peers,j,p2) {
453       if (i!=j) {
454         /* Mesurements of Bandwidth */
455         amok_bw_request(p1->name,p1->port,p2->name,p2->port,
456                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
457                         &sec,&matrix_res[i*len + j]);
458       } 
459     }
460   }
461   return matrix_res;
462 }