Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
32e9890ec05298f97fae336ba3d003bb9fa61cbd
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* amok_bandwidth - Bandwidth tests facilities                              */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/ex.h"
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
14
15
16 /******************************
17  * Stuff global to the module *
18  ******************************/
19
20 static short _amok_bw_initialized = 0;
21
22 /** @brief module initialization; all participating nodes must run this */
23 void amok_bw_init(void)
24 {
25
26   if (!_amok_bw_initialized) {
27     amok_bw_bw_init();
28     amok_bw_sat_init();
29   }
30
31   amok_bw_bw_join();
32   amok_bw_sat_join();
33
34   _amok_bw_initialized++;
35 }
36
37 /** @brief module finalization */
38 void amok_bw_exit(void)
39 {
40   if (!_amok_bw_initialized)
41     return;
42
43   amok_bw_bw_leave();
44   amok_bw_sat_leave();
45
46   _amok_bw_initialized--;
47 }
48
49 /* ***************************************************************************
50  * Bandwidth tests
51  * ***************************************************************************/
52 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
53 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
54
55 void amok_bw_bw_init()
56 {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc, "peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc, "buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc, "msg_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc, "msg_amount",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc, "min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
73
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc, "timestamp",
76                               gras_datadesc_by_name("unsigned int"));
77   gras_datadesc_struct_append(bw_res_desc, "seconds",
78                               gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc, "bw",
80                               gras_datadesc_by_name("double"));
81   gras_datadesc_struct_close(bw_res_desc);
82   bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
83
84   gras_msgtype_declare_rpc("BW handshake", bw_request_desc, bw_request_desc);
85
86   gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
87   gras_msgtype_declare("BW stop", NULL);
88
89   gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
90 }
91
92 void amok_bw_bw_join()
93 {
94   gras_cb_register("BW request", &amok_bw_cb_bw_request);
95   gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
96 }
97
98 void amok_bw_bw_leave()
99 {
100   gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
101   gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
102 }
103
104 /**
105  * \brief bandwidth measurement between localhost and \e peer
106  * 
107  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
108  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
109  * \arg msg_size: Size of each message sent. 
110  * \arg msg_amount: Amount of such messages to exchange 
111  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
112  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
113  * \arg bw: observed Bandwidth (in byte/s) 
114  *
115  * Conduct a bandwidth test from the local process to the given peer.
116  * This call is blocking until the end of the experiment.
117  *
118  * If the asked experiment lasts less than \a min_duration, another one will be
119  * launched (and others, if needed). msg_size will be multiplicated by
120  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
121  * reach the \a min_duration). In that case, the reported bandwidth and
122  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
123  * because we need to malloc a block of this size in RL to conduct the
124  * experiment, and we still don't want to visit the swap. In such case, the 
125  * number of messages is increased instead of their size.
126  *
127  * Results are reported in last args, and sizes are in byte.
128  * 
129  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
130  *           as the total amount of data to send and the msg_size. This
131  *           was changed for the fool wanting to send more than MAXINT
132  *           bytes in a fat pipe.
133  * 
134  */
135 void amok_bw_test(gras_socket_t peer,
136                   unsigned long int buf_size,
137                   unsigned long int msg_size,
138                   unsigned long int msg_amount,
139                   double min_duration, /*OUT*/ double *sec, double *bw)
140 {
141
142   /* Measurement sockets for the experiments */
143   gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
144   int port;
145   bw_request_t request, request_ack;
146   xbt_ex_t e;
147   int first_pass;
148
149   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
150     TRY {
151       measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
152     }
153     CATCH(e) {
154       measMasterIn = NULL;
155       if (port == 10000 - 1) {
156         RETHROW0("Error caught while opening a measurement socket: %s");
157       } else {
158         xbt_ex_free(e);
159       }
160     }
161   }
162
163   request = xbt_new0(s_bw_request_t, 1);
164   request->buf_size = buf_size;
165   request->msg_size = msg_size;
166   request->msg_amount = msg_amount;
167   request->peer.name = NULL;
168   request->peer.port = gras_socket_my_port(measMasterIn);
169   DEBUG6
170     ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
171      gras_socket_peer_name(peer), gras_socket_peer_port(peer),
172      request->peer.port, request->buf_size, request->msg_size,
173      request->msg_amount);
174
175   TRY {
176     gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
177   }
178   CATCH(e) {
179     RETHROW0("Error encountered while sending the BW request: %s");
180   }
181   measIn = gras_socket_meas_accept(measMasterIn);
182
183   TRY {
184     measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
185                                      request_ack->peer.port,
186                                      request->buf_size, 1);
187   }
188   CATCH(e) {
189     RETHROW2
190       ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
191        gras_socket_peer_name(peer), request_ack->peer.port);
192   }
193   DEBUG2("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
194          request->msg_size, request->msg_amount);
195
196   *sec = 0;
197   first_pass = 1;
198   do {
199     if (first_pass == 0) {
200       double meas_duration = *sec;
201       double increase;
202       if (*sec != 0.0) {
203         increase = (min_duration / meas_duration) * 1.1;
204       } else {
205         increase = 4;
206       }
207       /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
208       if (increase > 20)
209         increase = 20;
210
211       request->msg_size = request->msg_size * increase;
212
213       /* Do not do too large experiments messages or the sensors 
214          will start to swap to store one of them.
215          And then increase the number of messages to compensate (check for overflow there, too) */
216       if (request->msg_size > 64 * 1024 * 1024) {
217         unsigned long int new_amount =
218           ((request->msg_size / ((double) 64 * 1024 * 1024))
219            * request->msg_amount) + 1;
220
221         xbt_assert0(new_amount > request->msg_amount,
222                     "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
223         request->msg_amount = new_amount;
224
225         request->msg_size = 64 * 1024 * 1024;
226       }
227
228       VERB5
229         ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
230          meas_duration, min_duration, request->msg_size, request->msg_amount,
231          ((double) request->msg_size) * ((double) request->msg_amount /
232                                          (*sec) / 1024.0 / 1024.0));
233
234       gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
235     }
236
237     first_pass = 0;
238     *sec = gras_os_time();
239     TRY {
240       gras_socket_meas_send(measOut, 120, request->msg_size,
241                             request->msg_amount);
242       DEBUG0("Data sent. Wait ACK");
243       gras_socket_meas_recv(measIn, 120, 1, 1);
244     } CATCH(e) {
245       gras_socket_close(measOut);
246       gras_socket_close(measMasterIn);
247       gras_socket_close(measIn);
248       RETHROW0("Unable to conduct the experiment: %s");
249     }
250     *sec = gras_os_time() - *sec;
251     if (*sec != 0.0) {
252       *bw =
253         ((double) request->msg_size) * ((double) request->msg_amount) /
254         (*sec);
255     }
256     DEBUG1("Experiment done ; it took %f sec", *sec);
257     if (*sec <= 0) {
258       CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
259     }
260
261   } while (*sec < min_duration);
262
263   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
264          *sec, *bw);
265   gras_msg_send(peer, "BW stop", NULL);
266
267   free(request_ack);
268   free(request);
269   if (measIn != measMasterIn)
270     gras_socket_close(measIn);
271   gras_socket_close(measMasterIn);
272   gras_socket_close(measOut);
273 }
274
275
276 /* Callback to the "BW handshake" message: 
277    opens a server measurement socket,
278    indicate its port in an "BW handshaked" message,
279    receive the corresponding data on the measurement socket, 
280    close the measurement socket
281
282    sizes are in byte
283 */
284 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
285 {
286   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
287   gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
288   bw_request_t request = *(bw_request_t *) payload;
289   bw_request_t answer;
290   xbt_ex_t e;
291   int port;
292   int tooshort = 1;
293   gras_msg_cb_ctx_t ctx_reask;
294   static xbt_dynar_t msgtwaited = NULL;
295
296   DEBUG5
297     ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
298      gras_socket_peer_name(expeditor), request->peer.port, request->buf_size,
299      request->msg_size, request->msg_amount);
300
301   /* Build our answer */
302   answer = xbt_new0(s_bw_request_t, 1);
303
304   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
305     TRY {
306       measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
307     }
308     CATCH(e) {
309       measMasterIn = NULL;
310       if (port < 10000)
311         xbt_ex_free(e);
312       else
313         /* FIXME: tell error to remote */
314         RETHROW0
315           ("Error encountered while opening a measurement server socket: %s");
316     }
317   }
318
319   answer->buf_size = request->buf_size;
320   answer->msg_size = request->msg_size;
321   answer->msg_amount = request->msg_amount;
322   answer->peer.port = gras_socket_my_port(measMasterIn);
323
324   TRY {
325     gras_msg_rpcreturn(60, ctx, &answer);
326   }
327   CATCH(e) {
328     gras_socket_close(measMasterIn);
329     /* FIXME: tell error to remote */
330     RETHROW0("Error encountered while sending the answer: %s");
331   }
332
333
334   /* Don't connect asap to leave time to other side to enter the accept() */
335   TRY {
336     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
337                                      request->peer.port,
338                                      request->buf_size, 1);
339   }
340   CATCH(e) {
341     RETHROW2
342       ("Error encountered while opening a measurement socket back to %s:%d : %s",
343        gras_socket_peer_name(expeditor), request->peer.port);
344     /* FIXME: tell error to remote */
345   }
346
347   TRY {
348     measIn = gras_socket_meas_accept(measMasterIn);
349     DEBUG4
350       ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
351        answer->buf_size, answer->msg_size, answer->msg_amount,
352        answer->peer.port);
353   }
354   CATCH(e) {
355     gras_socket_close(measMasterIn);
356     gras_socket_close(measIn);
357     gras_socket_close(measOut);
358     /* FIXME: tell error to remote ? */
359     RETHROW0("Error encountered while opening the meas socket: %s");
360   }
361
362   if (!msgtwaited) {
363     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
364     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
365     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
366   }
367
368   while (tooshort) {
369     void *payload;
370     int msggot;
371     TRY {
372       gras_socket_meas_recv(measIn, 120, request->msg_size,
373                             request->msg_amount);
374       gras_socket_meas_send(measOut, 120, 1, 1);
375     } CATCH(e) {
376       gras_socket_close(measMasterIn);
377       gras_socket_close(measIn);
378       gras_socket_close(measOut);
379       /* FIXME: tell error to remote ? */
380       RETHROW0("Error encountered while receiving the experiment: %s");
381     }
382     gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
383     switch (msggot) {
384     case 0:                    /* BW stop */
385       tooshort = 0;
386       break;
387     case 1:                    /* BW reask */
388       tooshort = 1;
389       free(request);
390       request = (bw_request_t) payload;
391       VERB0("Return the reasking RPC");
392       gras_msg_rpcreturn(60, ctx_reask, NULL);
393     }
394     gras_msg_cb_ctx_free(ctx_reask);
395   }
396
397   if (measIn != measMasterIn)
398     gras_socket_close(measMasterIn);
399   gras_socket_close(measIn);
400   gras_socket_close(measOut);
401   free(answer);
402   free(request);
403   VERB0("BW experiment done.");
404   return 0;
405 }
406
407 /**
408  * \brief request a bandwidth measurement between two remote peers
409  *
410  * \arg from_name: Name of the first peer 
411  * \arg from_port: port on which the first process is listening for messages
412  * \arg to_name: Name of the second peer 
413  * \arg to_port: port on which the second process is listening (for messages, do not 
414  * give a measurement socket here. The needed measurement sockets will be created 
415  * automatically and negociated between the peers)
416  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
417  * \arg msg_size: Size of each message sent. 
418  * \arg msg_amount: Amount of such data to exchange
419  * \arg sec: where the result (in seconds) should be stored.
420  * \arg bw: observed Bandwidth (in byte/s)
421  *
422  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
423  * This call is blocking until the end of the experiment.
424  *
425  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
426  *           as the total amount of data to send and the msg_size. This
427  *           was changed for the fool wanting to send more than MAXINT
428  *           bytes in a fat pipe.
429  * 
430  * Results are reported in last args, and sizes are in bytes.
431  */
432 void amok_bw_request(const char *from_name, unsigned int from_port,
433                      const char *to_name, unsigned int to_port,
434                      unsigned long int buf_size,
435                      unsigned long int msg_size,
436                      unsigned long int msg_amount,
437                      double min_duration, /*OUT*/ double *sec, double *bw)
438 {
439
440   gras_socket_t sock;
441   /* The request */
442   bw_request_t request;
443   bw_res_t result;
444   request = xbt_new0(s_bw_request_t, 1);
445   request->buf_size = buf_size;
446   request->msg_size = msg_size;
447   request->msg_amount = msg_amount;
448   request->min_duration = min_duration;
449
450
451   request->peer.name = (char *) to_name;
452   request->peer.port = to_port;
453
454
455   sock = gras_socket_client(from_name, from_port);
456
457
458
459   DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
460          to_name, to_port);
461   gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
462
463   if (sec)
464     *sec = result->sec;
465   if (bw)
466     *bw = result->bw;
467
468   VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
469         from_name, from_port, to_name, to_port,
470         result->sec, ((double) result->bw) / 1024.0);
471
472   gras_socket_close(sock);
473   free(result);
474   free(request);
475 }
476
477 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
478 {
479
480   /* specification of the test to run, and our answer */
481   bw_request_t request = *(bw_request_t *) payload;
482   bw_res_t result = xbt_new0(s_bw_res_t, 1);
483   gras_socket_t peer, asker;
484
485   asker = gras_msg_cb_ctx_from(ctx);
486   VERB6("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
487         gras_socket_peer_name(asker), gras_socket_peer_port(asker),
488         request->peer.name, request->peer.port,
489         request->msg_size, request->msg_amount);
490   peer = gras_socket_client(request->peer.name, request->peer.port);
491   amok_bw_test(peer,
492                request->buf_size, request->msg_size, request->msg_amount,
493                request->min_duration, &(result->sec), &(result->bw));
494
495   gras_msg_rpcreturn(240, ctx, &result);
496
497   gras_os_sleep(1);
498   gras_socket_close(peer);      /* FIXME: it should be blocking in RL until everything is sent */
499   free(request->peer.name);
500   free(request);
501   free(result);
502
503   return 0;
504 }
505
506 /** \brief builds a matrix of results of bandwidth measurement
507  * 
508  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
509  *           as the total amount of data to send and the msg_size. This
510  *           was changed for the fool wanting to send more than MAXINT
511  *           bytes in a fat pipe.
512  */
513 double *amok_bw_matrix(xbt_dynar_t peers,
514                        int buf_size_bw, int msg_size_bw, int msg_amount_bw,
515                        double min_duration)
516 {
517   double sec;
518   /* construction of matrices for bandwith and latency */
519
520
521   unsigned int i, j;
522   int len = xbt_dynar_length(peers);
523
524   double *matrix_res = xbt_new0(double, len * len);
525   xbt_peer_t p1, p2;
526
527   xbt_dynar_foreach(peers, i, p1) {
528     xbt_dynar_foreach(peers, j, p2) {
529       if (i != j) {
530         /* Mesurements of Bandwidth */
531         amok_bw_request(p1->name, p1->port, p2->name, p2->port,
532                         buf_size_bw, msg_size_bw, msg_amount_bw, min_duration,
533                         &sec, &matrix_res[i * len + j]);
534       }
535     }
536   }
537   return matrix_res;
538 }