Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove usage of xbt_assert[0-9].
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* amok_bandwidth - Bandwidth tests facilities                              */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/ex.h"
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
14
15
16 /******************************
17  * Stuff global to the module *
18  ******************************/
19
20 static short _amok_bw_initialized = 0;
21
22 /** @brief module initialization; all participating nodes must run this */
23 void amok_bw_init(void)
24 {
25
26   if (!_amok_bw_initialized) {
27     amok_bw_bw_init();
28     amok_bw_sat_init();
29   }
30
31   amok_bw_bw_join();
32   amok_bw_sat_join();
33
34   _amok_bw_initialized++;
35 }
36
37 /** @brief module finalization */
38 void amok_bw_exit(void)
39 {
40   if (!_amok_bw_initialized)
41     return;
42
43   amok_bw_bw_leave();
44   amok_bw_sat_leave();
45
46   _amok_bw_initialized--;
47 }
48
49 /* ***************************************************************************
50  * Bandwidth tests
51  * ***************************************************************************/
52 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
53 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
54
55 void amok_bw_bw_init()
56 {
57   gras_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */
60   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
61   gras_datadesc_struct_append(bw_request_desc, "peer",
62                               gras_datadesc_by_name("s_xbt_peer_t"));
63   gras_datadesc_struct_append(bw_request_desc, "buf_size",
64                               gras_datadesc_by_name("unsigned long int"));
65   gras_datadesc_struct_append(bw_request_desc, "msg_size",
66                               gras_datadesc_by_name("unsigned long int"));
67   gras_datadesc_struct_append(bw_request_desc, "msg_amount",
68                               gras_datadesc_by_name("unsigned long int"));
69   gras_datadesc_struct_append(bw_request_desc, "min_duration",
70                               gras_datadesc_by_name("double"));
71   gras_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = gras_datadesc_ref("bw_request_t", bw_request_desc);
73
74   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
75   gras_datadesc_struct_append(bw_res_desc, "timestamp",
76                               gras_datadesc_by_name("unsigned int"));
77   gras_datadesc_struct_append(bw_res_desc, "seconds",
78                               gras_datadesc_by_name("double"));
79   gras_datadesc_struct_append(bw_res_desc, "bw",
80                               gras_datadesc_by_name("double"));
81   gras_datadesc_struct_close(bw_res_desc);
82   bw_res_desc = gras_datadesc_ref("bw_res_t", bw_res_desc);
83
84   gras_msgtype_declare_rpc("BW handshake", bw_request_desc,
85                            bw_request_desc);
86
87   gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
88   gras_msgtype_declare("BW stop", NULL);
89
90   gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
91 }
92
93 void amok_bw_bw_join()
94 {
95   gras_cb_register("BW request", &amok_bw_cb_bw_request);
96   gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
97 }
98
99 void amok_bw_bw_leave()
100 {
101   gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
102   gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
103 }
104
105 /**
106  * \brief bandwidth measurement between localhost and \e peer
107  * 
108  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
109  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
110  * \arg msg_size: Size of each message sent. 
111  * \arg msg_amount: Amount of such messages to exchange 
112  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
113  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
114  * \arg bw: observed Bandwidth (in byte/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * If the asked experiment lasts less than \a min_duration, another one will be
120  * launched (and others, if needed). msg_size will be multiplicated by
121  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
122  * reach the \a min_duration). In that case, the reported bandwidth and
123  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
124  * because we need to malloc a block of this size in RL to conduct the
125  * experiment, and we still don't want to visit the swap. In such case, the 
126  * number of messages is increased instead of their size.
127  *
128  * Results are reported in last args, and sizes are in byte.
129  * 
130  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
131  *           as the total amount of data to send and the msg_size. This
132  *           was changed for the fool wanting to send more than MAXINT
133  *           bytes in a fat pipe.
134  * 
135  */
136 void amok_bw_test(gras_socket_t peer,
137                   unsigned long int buf_size,
138                   unsigned long int msg_size,
139                   unsigned long int msg_amount,
140                   double min_duration, /*OUT*/ double *sec, double *bw)
141 {
142
143   /* Measurement sockets for the experiments */
144   gras_socket_t measMasterIn = NULL, measIn, measOut = NULL;
145   int port;
146   bw_request_t request, request_ack;
147   xbt_ex_t e;
148   int first_pass;
149
150   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
151     TRY {
152       measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
153     }
154     CATCH(e) {
155       measMasterIn = NULL;
156       if (port == 10000 - 1) {
157         RETHROWF("Error caught while opening a measurement socket: %s");
158       } else {
159         xbt_ex_free(e);
160       }
161     }
162   }
163
164   request = xbt_new0(s_bw_request_t, 1);
165   request->buf_size = buf_size;
166   request->msg_size = msg_size;
167   request->msg_amount = msg_amount;
168   request->peer.name = NULL;
169   request->peer.port = gras_socket_my_port(measMasterIn);
170   XBT_DEBUG
171       ("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld, msg_size=%ld, msg_amount=%ld)",
172        gras_socket_peer_name(peer), gras_socket_peer_port(peer),
173        request->peer.port, request->buf_size, request->msg_size,
174        request->msg_amount);
175
176   TRY {
177     gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
178   }
179   CATCH(e) {
180     RETHROWF("Error encountered while sending the BW request: %s");
181   }
182   measIn = gras_socket_meas_accept(measMasterIn);
183
184   TRY {
185     measOut = gras_socket_client_ext(gras_socket_peer_name(peer),
186                                      request_ack->peer.port,
187                                      request->buf_size, 1);
188   }
189   CATCH(e) {
190     RETHROWF
191         ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
192          gras_socket_peer_name(peer), request_ack->peer.port);
193   }
194   XBT_DEBUG
195       ("Got ACK; conduct the experiment (msg_size = %ld, msg_amount=%ld)",
196        request->msg_size, request->msg_amount);
197
198   *sec = 0;
199   first_pass = 1;
200   do {
201     if (first_pass == 0) {
202       double meas_duration = *sec;
203       double increase;
204       if (*sec != 0.0) {
205         increase = (min_duration / meas_duration) * 1.1;
206       } else {
207         increase = 4;
208       }
209       /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
210       if (increase > 20)
211         increase = 20;
212
213       request->msg_size = request->msg_size * increase;
214
215       /* Do not do too large experiments messages or the sensors 
216          will start to swap to store one of them.
217          And then increase the number of messages to compensate (check for overflow there, too) */
218       if (request->msg_size > 64 * 1024 * 1024) {
219         unsigned long int new_amount =
220             ((request->msg_size / ((double) 64 * 1024 * 1024))
221              * request->msg_amount) + 1;
222
223         xbt_assert(new_amount > request->msg_amount,
224                     "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
225         request->msg_amount = new_amount;
226
227         request->msg_size = 64 * 1024 * 1024;
228       }
229
230       XBT_VERB
231           ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
232            meas_duration, min_duration, request->msg_size,
233            request->msg_amount,
234            ((double) request->msg_size) * ((double) request->msg_amount /
235                                            (*sec) / 1024.0 / 1024.0));
236
237       gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
238     }
239
240     first_pass = 0;
241     *sec = gras_os_time();
242     TRY {
243       gras_socket_meas_send(measOut, 120, request->msg_size,
244                             request->msg_amount);
245       XBT_DEBUG("Data sent. Wait ACK");
246       gras_socket_meas_recv(measIn, 120, 1, 1);
247     } CATCH(e) {
248       gras_socket_close(measOut);
249       gras_socket_close(measMasterIn);
250       gras_socket_close(measIn);
251       RETHROWF("Unable to conduct the experiment: %s");
252     }
253     *sec = gras_os_time() - *sec;
254     if (*sec != 0.0) {
255       *bw =
256           ((double) request->msg_size) * ((double) request->msg_amount) /
257           (*sec);
258     }
259     XBT_DEBUG("Experiment done ; it took %f sec", *sec);
260     if (*sec <= 0) {
261       XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
262     }
263
264   } while (*sec < min_duration);
265
266   XBT_DEBUG
267       ("This measurement was long enough (%f sec; found %f b/s). Stop peer",
268        *sec, *bw);
269   gras_msg_send(peer, "BW stop", NULL);
270
271   free(request_ack);
272   free(request);
273   if (measIn != measMasterIn)
274     gras_socket_close(measIn);
275   gras_socket_close(measMasterIn);
276   gras_socket_close(measOut);
277 }
278
279
280 /* Callback to the "BW handshake" message: 
281    opens a server measurement socket,
282    indicate its port in an "BW handshaked" message,
283    receive the corresponding data on the measurement socket, 
284    close the measurement socket
285
286    sizes are in byte
287 */
288 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
289 {
290   gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
291   gras_socket_t measMasterIn = NULL, measIn = NULL, measOut = NULL;
292   bw_request_t request = *(bw_request_t *) payload;
293   bw_request_t answer;
294   xbt_ex_t e;
295   int port;
296   int tooshort = 1;
297   gras_msg_cb_ctx_t ctx_reask;
298   static xbt_dynar_t msgtwaited = NULL;
299
300   XBT_DEBUG
301       ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
302        gras_socket_peer_name(expeditor), request->peer.port,
303        request->buf_size, request->msg_size, request->msg_amount);
304
305   /* Build our answer */
306   answer = xbt_new0(s_bw_request_t, 1);
307
308   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
309     TRY {
310       measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
311     }
312     CATCH(e) {
313       measMasterIn = NULL;
314       if (port < 10000)
315         xbt_ex_free(e);
316       else
317         /* FIXME: tell error to remote */
318         RETHROWF
319             ("Error encountered while opening a measurement server socket: %s");
320     }
321   }
322
323   answer->buf_size = request->buf_size;
324   answer->msg_size = request->msg_size;
325   answer->msg_amount = request->msg_amount;
326   answer->peer.port = gras_socket_my_port(measMasterIn);
327
328   TRY {
329     gras_msg_rpcreturn(60, ctx, &answer);
330   }
331   CATCH(e) {
332     gras_socket_close(measMasterIn);
333     /* FIXME: tell error to remote */
334     RETHROWF("Error encountered while sending the answer: %s");
335   }
336
337
338   /* Don't connect asap to leave time to other side to enter the accept() */
339   TRY {
340     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
341                                      request->peer.port,
342                                      request->buf_size, 1);
343   }
344   CATCH(e) {
345     RETHROWF
346         ("Error encountered while opening a measurement socket back to %s:%d : %s",
347          gras_socket_peer_name(expeditor), request->peer.port);
348     /* FIXME: tell error to remote */
349   }
350
351   TRY {
352     measIn = gras_socket_meas_accept(measMasterIn);
353     XBT_DEBUG
354         ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
355          answer->buf_size, answer->msg_size, answer->msg_amount,
356          answer->peer.port);
357   }
358   CATCH(e) {
359     gras_socket_close(measMasterIn);
360     gras_socket_close(measIn);
361     gras_socket_close(measOut);
362     /* FIXME: tell error to remote ? */
363     RETHROWF("Error encountered while opening the meas socket: %s");
364   }
365
366   if (!msgtwaited) {
367     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
368     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
369     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
370   }
371
372   while (tooshort) {
373     void *payload;
374     int msggot;
375     TRY {
376       gras_socket_meas_recv(measIn, 120, request->msg_size,
377                             request->msg_amount);
378       gras_socket_meas_send(measOut, 120, 1, 1);
379     } CATCH(e) {
380       gras_socket_close(measMasterIn);
381       gras_socket_close(measIn);
382       gras_socket_close(measOut);
383       /* FIXME: tell error to remote ? */
384       RETHROWF("Error encountered while receiving the experiment: %s");
385     }
386     gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payload);
387     switch (msggot) {
388     case 0:                    /* BW stop */
389       tooshort = 0;
390       break;
391     case 1:                    /* BW reask */
392       tooshort = 1;
393       free(request);
394       request = (bw_request_t) payload;
395       XBT_VERB("Return the reasking RPC");
396       gras_msg_rpcreturn(60, ctx_reask, NULL);
397     }
398     gras_msg_cb_ctx_free(ctx_reask);
399   }
400
401   if (measIn != measMasterIn)
402     gras_socket_close(measMasterIn);
403   gras_socket_close(measIn);
404   gras_socket_close(measOut);
405   free(answer);
406   free(request);
407   XBT_VERB("BW experiment done.");
408   return 0;
409 }
410
411 /**
412  * \brief request a bandwidth measurement between two remote peers
413  *
414  * \arg from_name: Name of the first peer 
415  * \arg from_port: port on which the first process is listening for messages
416  * \arg to_name: Name of the second peer 
417  * \arg to_port: port on which the second process is listening (for messages, do not 
418  * give a measurement socket here. The needed measurement sockets will be created 
419  * automatically and negociated between the peers)
420  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
421  * \arg msg_size: Size of each message sent. 
422  * \arg msg_amount: Amount of such data to exchange
423  * \arg sec: where the result (in seconds) should be stored.
424  * \arg bw: observed Bandwidth (in byte/s)
425  *
426  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
427  * This call is blocking until the end of the experiment.
428  *
429  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
430  *           as the total amount of data to send and the msg_size. This
431  *           was changed for the fool wanting to send more than MAXINT
432  *           bytes in a fat pipe.
433  * 
434  * Results are reported in last args, and sizes are in bytes.
435  */
436 void amok_bw_request(const char *from_name, unsigned int from_port,
437                      const char *to_name, unsigned int to_port,
438                      unsigned long int buf_size,
439                      unsigned long int msg_size,
440                      unsigned long int msg_amount,
441                      double min_duration, /*OUT*/ double *sec, double *bw)
442 {
443
444   gras_socket_t sock;
445   /* The request */
446   bw_request_t request;
447   bw_res_t result;
448   request = xbt_new0(s_bw_request_t, 1);
449   request->buf_size = buf_size;
450   request->msg_size = msg_size;
451   request->msg_amount = msg_amount;
452   request->min_duration = min_duration;
453
454
455   request->peer.name = (char *) to_name;
456   request->peer.port = to_port;
457
458
459   sock = gras_socket_client(from_name, from_port);
460
461
462
463   XBT_DEBUG("Ask for a BW test between %s:%d and %s:%d", from_name, from_port,
464          to_name, to_port);
465   gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
466
467   if (sec)
468     *sec = result->sec;
469   if (bw)
470     *bw = result->bw;
471
472   XBT_VERB("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
473         from_name, from_port, to_name, to_port,
474         result->sec, ((double) result->bw) / 1024.0);
475
476   gras_socket_close(sock);
477   free(result);
478   free(request);
479 }
480
481 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
482 {
483
484   /* specification of the test to run, and our answer */
485   bw_request_t request = *(bw_request_t *) payload;
486   bw_res_t result = xbt_new0(s_bw_res_t, 1);
487   gras_socket_t peer, asker;
488
489   asker = gras_msg_cb_ctx_from(ctx);
490   XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %ld %ld)",
491         gras_socket_peer_name(asker), gras_socket_peer_port(asker),
492         request->peer.name, request->peer.port,
493         request->msg_size, request->msg_amount);
494   peer = gras_socket_client(request->peer.name, request->peer.port);
495   amok_bw_test(peer,
496                request->buf_size, request->msg_size, request->msg_amount,
497                request->min_duration, &(result->sec), &(result->bw));
498
499   gras_msg_rpcreturn(240, ctx, &result);
500
501   gras_os_sleep(1);
502   gras_socket_close(peer);      /* FIXME: it should be blocking in RL until everything is sent */
503   free(request->peer.name);
504   free(request);
505   free(result);
506
507   return 0;
508 }
509
510 /** \brief builds a matrix of results of bandwidth measurement
511  * 
512  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
513  *           as the total amount of data to send and the msg_size. This
514  *           was changed for the fool wanting to send more than MAXINT
515  *           bytes in a fat pipe.
516  */
517 double *amok_bw_matrix(xbt_dynar_t peers,
518                        int buf_size_bw, int msg_size_bw, int msg_amount_bw,
519                        double min_duration)
520 {
521   double sec;
522   /* construction of matrices for bandwith and latency */
523
524
525   unsigned int i, j;
526   int len = xbt_dynar_length(peers);
527
528   double *matrix_res = xbt_new0(double, len * len);
529   xbt_peer_t p1, p2;
530
531   xbt_dynar_foreach(peers, i, p1) {
532     xbt_dynar_foreach(peers, j, p2) {
533       if (i != j) {
534         /* Mesurements of Bandwidth */
535         amok_bw_request(p1->name, p1->port, p2->name, p2->port,
536                         buf_size_bw, msg_size_bw, msg_amount_bw,
537                         min_duration, &sec, &matrix_res[i * len + j]);
538       }
539     }
540   }
541   return matrix_res;
542 }