Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
new tracing mask TRACE_VOLUME to trace the msg tasks communication size and group...
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-6 Martin Quinson.                                     */
6 /* Copyright (c) 2006   Ahmed Harbaoui.                                     */
7 /* All rights reserved.                                                     */
8
9 /* This program is free software; you can redistribute it and/or modify it
10  * under the terms of the license (GNU LGPL) which comes with this package. */
11
12 #include "xbt/ex.h"
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
17
18
19 /******************************
20  * Stuff global to the module *
21  ******************************/
22
23 static short _amok_bw_initialized = 0;
24
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void)
27 {
28
29   if (!_amok_bw_initialized) {
30     amok_bw_bw_init();
31     amok_bw_sat_init();
32   }
33
34   amok_bw_bw_join();
35   amok_bw_sat_join();
36
37   _amok_bw_initialized++;
38 }
39
40 /** @brief module finalization */
41 void amok_bw_exit(void)
42 {
43   if (!_amok_bw_initialized)
44     return;
45
46   amok_bw_bw_leave();
47   amok_bw_sat_leave();
48
49   _amok_bw_initialized--;
50 }
51
52 /* ***************************************************************************
53  * Bandwidth tests
54  * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
57
58 void amok_bw_bw_init()
59 {
60   gras_datadesc_type_t bw_request_desc, bw_res_desc;
61
62   /* Build the Bandwidth datatype descriptions */
63   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
64   gras_datadesc_struct_append(bw_request_desc, "peer",
65                               gras_datadesc_by_name("s_xbt_peer_t"));
66   gras_datadesc_struct_append(bw_request_desc, "buf_size",
67                               gras_datadesc_by_name("unsigned long int"));
68   gras_datadesc_struct_append(bw_request_desc, "msg_size",
69                               gras_datadesc_by_name("unsigned long int"));
70   gras_datadesc_struct_append(bw_request_desc, "msg_amount",
71                               gras_datadesc_by_name("unsigned long int"));
72   gras_datadesc_struct_append(bw_request_desc, "min_duration",
73                               gras_datadesc_by_name("double"));
74   gras_datadesc_struct_close(bw_request_desc);
75   bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
76
77   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
78   gras_datadesc_struct_append(bw_res_desc, "timestamp",
79                               gras_datadesc_by_name("unsigned int"));
80   gras_datadesc_struct_append(bw_res_desc, "seconds",
81                               gras_datadesc_by_name("double"));
82   gras_datadesc_struct_append(bw_res_desc, "bw",
83                               gras_datadesc_by_name("double"));
84   gras_datadesc_struct_close(bw_res_desc);
85   bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
86
87   gras_msgtype_declare_rpc("BW handshake", bw_request_desc, bw_request_desc);
88
89   gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
90   gras_msgtype_declare("BW stop", NULL);
91
92   gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
93 }
94
95 void amok_bw_bw_join()
96 {
97   gras_cb_register("BW request", &amok_bw_cb_bw_request);
98   gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
99 }
100
101 void amok_bw_bw_leave()
102 {
103   gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
104   gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
105 }
106
107 /**
108  * \brief bandwidth measurement between localhost and \e peer
109  * 
110  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
112  * \arg msg_size: Size of each message sent. 
113  * \arg msg_amount: Amount of such messages to exchange 
114  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
115  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
116  * \arg bw: observed Bandwidth (in byte/s) 
117  *
118  * Conduct a bandwidth test from the local process to the given peer.
119  * This call is blocking until the end of the experiment.
120  *
121  * If the asked experiment lasts less than \a min_duration, another one will be
122  * launched (and others, if needed). msg_size will be multiplicated by
123  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
124  * reach the \a min_duration). In that case, the reported bandwidth and
125  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
126  * because we need to malloc a block of this size in RL to conduct the
127  * experiment, and we still don't want to visit the swap. In such case, the 
128  * number of messages is increased instead of their size.
129  *
130  * Results are reported in last args, and sizes are in byte.
131  * 
132  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
133  *           as the total amount of data to send and the msg_size. This
134  *           was changed for the fool wanting to send more than MAXINT
135  *           bytes in a fat pipe.
136  * 
137  */
138 void amok_bw_test(gras_socket_t peer,
139                   unsigned long int buf_size,
140                   unsigned long int msg_size,
141                   unsigned long int msg_amount,
142                   double min_duration, /*OUT*/ double *sec, double *bw)
143 {
144
145   /* Measurement sockets for the experiments */
146   gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
147   int port;
148   bw_request_t request, request_ack;
149   xbt_ex_t e;
150   int first_pass;
151
152   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
153     TRY {
154       measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
155     }
156     CATCH(e) {
157       measMasterIn = NULL;
158       if (port == 10000 - 1) {
159         RETHROW0("Error caught while opening a measurement socket: %s");
160       } else {
161         xbt_ex_free(e);
162       }
163     }
164   }
165
166   request = xbt_new0(s_bw_request_t, 1);
167   request->buf_size = buf_size;
168   request->msg_size = msg_size;
169   request->msg_amount = msg_amount;
170   request->peer.name = NULL;
171   request->peer.port = gras_socket_my_port(measMasterIn);
172   DEBUG6
173     ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
174      gras_socket_peer_name(peer), gras_socket_peer_port(peer),
175      request->peer.port, request->buf_size, request->msg_size,
176      request->msg_amount);
177
178   TRY {
179     gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
180   }
181   CATCH(e) {
182     RETHROW0("Error encountered while sending the BW request: %s");
183   }
184   measIn = gras_socket_meas_accept(measMasterIn);
185
186   TRY {
187     measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
188                                      request_ack->peer.port,
189                                      request->buf_size, 1);
190   }
191   CATCH(e) {
192     RETHROW2
193       ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
194        gras_socket_peer_name(peer), request_ack->peer.port);
195   }
196   DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
197          request->msg_size, request->msg_amount);
198
199   *sec = 0;
200   first_pass = 1;
201   do {
202     if (first_pass == 0) {
203       double meas_duration = *sec;
204       double increase;
205       if (*sec != 0.0) {
206         increase = (min_duration / meas_duration) * 1.1;
207       } else {
208         increase = 4;
209       }
210       /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
211       if (increase > 20)
212         increase = 20;
213
214       request->msg_size = request->msg_size * increase;
215
216       /* Do not do too large experiments messages or the sensors 
217          will start to swap to store one of them.
218          And then increase the number of messages to compensate (check for overflow there, too) */
219       if (request->msg_size > 64 * 1024 * 1024) {
220         unsigned long int new_amount =
221           ((request->msg_size / ((double) 64 * 1024 * 1024))
222            * request->msg_amount) + 1;
223
224         xbt_assert0(new_amount > request->msg_amount,
225                     "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
226         request->msg_amount = new_amount;
227
228         request->msg_size = 64 * 1024 * 1024;
229       }
230
231       VERB5
232         ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
233          meas_duration, min_duration, request->msg_size, request->msg_amount,
234          ((double) request->msg_size) * ((double) request->msg_amount /
235                                          (*sec) / 1024.0 / 1024.0));
236
237       gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
238     }
239
240     first_pass = 0;
241     *sec = gras_os_time();
242     TRY {
243       gras_socket_meas_send(measOut, 120, request->msg_size,
244                             request->msg_amount);
245       DEBUG0("Data sent. Wait ACK");
246       gras_socket_meas_recv(measIn, 120, 1, 1);
247     } CATCH(e) {
248       gras_socket_close(measOut);
249       gras_socket_close(measMasterIn);
250       gras_socket_close(measIn);
251       RETHROW0("Unable to conduct the experiment: %s");
252     }
253     *sec = gras_os_time() - *sec;
254     if (*sec != 0.0) {
255       *bw =
256         ((double) request->msg_size) * ((double) request->msg_amount) /
257         (*sec);
258     }
259     DEBUG1("Experiment done ; it took %f sec", *sec);
260     if (*sec <= 0) {
261       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
262     }
263
264   } while (*sec < min_duration);
265
266   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
267          *sec, *bw);
268   gras_msg_send(peer, "BW stop", NULL);
269
270   free(request_ack);
271   free(request);
272   if (measIn != measMasterIn)
273     gras_socket_close(measIn);
274   gras_socket_close(measMasterIn);
275   gras_socket_close(measOut);
276 }
277
278
279 /* Callback to the "BW handshake" message: 
280    opens a server measurement socket,
281    indicate its port in an "BW handshaked" message,
282    receive the corresponding data on the measurement socket, 
283    close the measurement socket
284
285    sizes are in byte
286 */
287 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
288 {
289   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
290   gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
291   bw_request_t request = *(bw_request_t *) payload;
292   bw_request_t answer;
293   xbt_ex_t e;
294   int port;
295   int tooshort = 1;
296   gras_msg_cb_ctx_t ctx_reask;
297   static xbt_dynar_t msgtwaited = NULL;
298
299   DEBUG5
300     ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
301      gras_socket_peer_name(expeditor), request->peer.port, request->buf_size,
302      request->msg_size, request->msg_amount);
303
304   /* Build our answer */
305   answer = xbt_new0(s_bw_request_t, 1);
306
307   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
308     TRY {
309       measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
310     }
311     CATCH(e) {
312       measMasterIn = NULL;
313       if (port < 10000)
314         xbt_ex_free(e);
315       else
316         /* FIXME: tell error to remote */
317         RETHROW0
318           ("Error encountered while opening a measurement server socket: %s");
319     }
320   }
321
322   answer->buf_size = request->buf_size;
323   answer->msg_size = request->msg_size;
324   answer->msg_amount = request->msg_amount;
325   answer->peer.port = gras_socket_my_port(measMasterIn);
326
327   TRY {
328     gras_msg_rpcreturn(60, ctx, &answer);
329   }
330   CATCH(e) {
331     gras_socket_close(measMasterIn);
332     /* FIXME: tell error to remote */
333     RETHROW0("Error encountered while sending the answer: %s");
334   }
335
336
337   /* Don't connect asap to leave time to other side to enter the accept() */
338   TRY {
339     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
340                                      request->peer.port,
341                                      request->buf_size, 1);
342   }
343   CATCH(e) {
344     RETHROW2
345       ("Error encountered while opening a measurement socket back to %s:%d : %s",
346        gras_socket_peer_name(expeditor), request->peer.port);
347     /* FIXME: tell error to remote */
348   }
349
350   TRY {
351     measIn = gras_socket_meas_accept(measMasterIn);
352     DEBUG4
353       ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
354        answer->buf_size, answer->msg_size, answer->msg_amount,
355        answer->peer.port);
356   }
357   CATCH(e) {
358     gras_socket_close(measMasterIn);
359     gras_socket_close(measIn);
360     gras_socket_close(measOut);
361     /* FIXME: tell error to remote ? */
362     RETHROW0("Error encountered while opening the meas socket: %s");
363   }
364
365   if (!msgtwaited) {
366     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
367     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
368     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
369   }
370
371   while (tooshort) {
372     void *payload;
373     int msggot;
374     TRY {
375       gras_socket_meas_recv(measIn, 120, request->msg_size,
376                             request->msg_amount);
377       gras_socket_meas_send(measOut, 120, 1, 1);
378     } CATCH(e) {
379       gras_socket_close(measMasterIn);
380       gras_socket_close(measIn);
381       gras_socket_close(measOut);
382       /* FIXME: tell error to remote ? */
383       RETHROW0("Error encountered while receiving the experiment: %s");
384     }
385     gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
386     switch (msggot) {
387     case 0:                    /* BW stop */
388       tooshort = 0;
389       break;
390     case 1:                    /* BW reask */
391       tooshort = 1;
392       free(request);
393       request = (bw_request_t) payload;
394       VERB0("Return the reasking RPC");
395       gras_msg_rpcreturn(60, ctx_reask, NULL);
396     }
397     gras_msg_cb_ctx_free(ctx_reask);
398   }
399
400   if (measIn != measMasterIn)
401     gras_socket_close(measMasterIn);
402   gras_socket_close(measIn);
403   gras_socket_close(measOut);
404   free(answer);
405   free(request);
406   VERB0("BW experiment done.");
407   return 0;
408 }
409
410 /**
411  * \brief request a bandwidth measurement between two remote peers
412  *
413  * \arg from_name: Name of the first peer 
414  * \arg from_port: port on which the first process is listening for messages
415  * \arg to_name: Name of the second peer 
416  * \arg to_port: port on which the second process is listening (for messages, do not 
417  * give a measurement socket here. The needed measurement sockets will be created 
418  * automatically and negociated between the peers)
419  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
420  * \arg msg_size: Size of each message sent. 
421  * \arg msg_amount: Amount of such data to exchange
422  * \arg sec: where the result (in seconds) should be stored.
423  * \arg bw: observed Bandwidth (in byte/s)
424  *
425  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
426  * This call is blocking until the end of the experiment.
427  *
428  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
429  *           as the total amount of data to send and the msg_size. This
430  *           was changed for the fool wanting to send more than MAXINT
431  *           bytes in a fat pipe.
432  * 
433  * Results are reported in last args, and sizes are in bytes.
434  */
435 void amok_bw_request(const char *from_name, unsigned int from_port,
436                      const char *to_name, unsigned int to_port,
437                      unsigned long int buf_size,
438                      unsigned long int msg_size,
439                      unsigned long int msg_amount,
440                      double min_duration, /*OUT*/ double *sec, double *bw)
441 {
442
443   gras_socket_t sock;
444   /* The request */
445   bw_request_t request;
446   bw_res_t result;
447   request = xbt_new0(s_bw_request_t, 1);
448   request->buf_size = buf_size;
449   request->msg_size = msg_size;
450   request->msg_amount = msg_amount;
451   request->min_duration = min_duration;
452
453
454   request->peer.name = (char *) to_name;
455   request->peer.port = to_port;
456
457
458   sock = gras_socket_client(from_name, from_port);
459
460
461
462   DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
463          to_name, to_port);
464   gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
465
466   if (sec)
467     *sec = result->sec;
468   if (bw)
469     *bw = result->bw;
470
471   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
472         from_name, from_port, to_name, to_port,
473         result->sec, ((double) result->bw) / 1024.0);
474
475   gras_socket_close(sock);
476   free(result);
477   free(request);
478 }
479
480 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
481 {
482
483   /* specification of the test to run, and our answer */
484   bw_request_t request = *(bw_request_t *) payload;
485   bw_res_t result = xbt_new0(s_bw_res_t, 1);
486   gras_socket_t peer, asker;
487
488   asker = gras_msg_cb_ctx_from(ctx);
489   VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
490         gras_socket_peer_name(asker), gras_socket_peer_port(asker),
491         request->peer.name, request->peer.port,
492         request->msg_size, request->msg_amount);
493   peer = gras_socket_client(request->peer.name, request->peer.port);
494   amok_bw_test(peer,
495                request->buf_size, request->msg_size, request->msg_amount,
496                request->min_duration, &(result->sec), &(result->bw));
497
498   gras_msg_rpcreturn(240, ctx, &result);
499
500   gras_os_sleep(1);
501   gras_socket_close(peer);      /* FIXME: it should be blocking in RL until everything is sent */
502   free(request->peer.name);
503   free(request);
504   free(result);
505
506   return 0;
507 }
508
509 /** \brief builds a matrix of results of bandwidth measurement
510  * 
511  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
512  *           as the total amount of data to send and the msg_size. This
513  *           was changed for the fool wanting to send more than MAXINT
514  *           bytes in a fat pipe.
515  */
516 double *amok_bw_matrix(xbt_dynar_t peers,
517                        int buf_size_bw, int msg_size_bw, int msg_amount_bw,
518                        double min_duration)
519 {
520   double sec;
521   /* construction of matrices for bandwith and latency */
522
523
524   unsigned int i, j;
525   int len = xbt_dynar_length(peers);
526
527   double *matrix_res = xbt_new0(double, len * len);
528   xbt_peer_t p1, p2;
529
530   xbt_dynar_foreach(peers, i, p1) {
531     xbt_dynar_foreach(peers, j, p2) {
532       if (i != j) {
533         /* Mesurements of Bandwidth */
534         amok_bw_request(p1->name, p1->port, p2->name, p2->port,
535                         buf_size_bw, msg_size_bw, msg_amount_bw, min_duration,
536                         &sec, &matrix_res[i * len + j]);
537       }
538     }
539   }
540   return matrix_res;
541 }