Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[smpi] add a gestion of non-contignous data
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* amok_bandwidth - Bandwidth tests facilities                              */
2
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2009, 2010. The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/ex.h"
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw, amok, "Bandwidth testing");
14
15
16 /******************************
17  * Stuff global to the module *
18  ******************************/
19
20 static short _amok_bw_initialized = 0;
21
22 /** @brief module initialization; all participating nodes must run this */
23 void amok_bw_init(void)
24 {
25
26   if (!_amok_bw_initialized) {
27     amok_bw_bw_init();
28     amok_bw_sat_init();
29   }
30
31   amok_bw_bw_join();
32   amok_bw_sat_join();
33
34   _amok_bw_initialized++;
35 }
36
37 /** @brief module finalization */
38 void amok_bw_exit(void)
39 {
40   if (!_amok_bw_initialized)
41     return;
42
43   amok_bw_bw_leave();
44   amok_bw_sat_leave();
45
46   _amok_bw_initialized--;
47 }
48
49 /* ***************************************************************************
50  * Bandwidth tests
51  * ***************************************************************************/
52 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
53 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
54
55 void amok_bw_bw_init()
56 {
57   xbt_datadesc_type_t bw_request_desc, bw_res_desc;
58
59   /* Build the Bandwidth datatype descriptions */
60   bw_request_desc = xbt_datadesc_struct("s_bw_request_t");
61   xbt_datadesc_struct_append(bw_request_desc, "peer",
62                               xbt_datadesc_by_name("s_xbt_peer_t"));
63   xbt_datadesc_struct_append(bw_request_desc, "buf_size",
64                               xbt_datadesc_by_name("unsigned long int"));
65   xbt_datadesc_struct_append(bw_request_desc, "msg_size",
66                               xbt_datadesc_by_name("unsigned long int"));
67   xbt_datadesc_struct_append(bw_request_desc, "msg_amount",
68                               xbt_datadesc_by_name("unsigned long int"));
69   xbt_datadesc_struct_append(bw_request_desc, "min_duration",
70                               xbt_datadesc_by_name("double"));
71   xbt_datadesc_struct_close(bw_request_desc);
72   bw_request_desc = xbt_datadesc_ref("bw_request_t", bw_request_desc);
73
74   bw_res_desc = xbt_datadesc_struct("s_bw_res_t");
75   xbt_datadesc_struct_append(bw_res_desc, "timestamp",
76                               xbt_datadesc_by_name("unsigned int"));
77   xbt_datadesc_struct_append(bw_res_desc, "seconds",
78                               xbt_datadesc_by_name("double"));
79   xbt_datadesc_struct_append(bw_res_desc, "bw",
80                               xbt_datadesc_by_name("double"));
81   xbt_datadesc_struct_close(bw_res_desc);
82   bw_res_desc = xbt_datadesc_ref("bw_res_t", bw_res_desc);
83
84   gras_msgtype_declare_rpc("BW handshake", bw_request_desc,
85                            bw_request_desc);
86
87   gras_msgtype_declare_rpc("BW reask", bw_request_desc, NULL);
88   gras_msgtype_declare("BW stop", NULL);
89
90   gras_msgtype_declare_rpc("BW request", bw_request_desc, bw_res_desc);
91 }
92
93 void amok_bw_bw_join()
94 {
95   gras_cb_register("BW request", &amok_bw_cb_bw_request);
96   gras_cb_register("BW handshake", &amok_bw_cb_bw_handshake);
97 }
98
99 void amok_bw_bw_leave()
100 {
101   gras_cb_unregister("BW request", &amok_bw_cb_bw_request);
102   gras_cb_unregister("BW handshake", &amok_bw_cb_bw_handshake);
103 }
104
105 /**
106  * \brief bandwidth measurement between localhost and \e peer
107  * 
108  * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
109  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
110  * \arg msg_size: Size of each message sent. 
111  * \arg msg_amount: Amount of such messages to exchange 
112  * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
113  * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
114  * \arg bw: observed Bandwidth (in byte/s) 
115  *
116  * Conduct a bandwidth test from the local process to the given peer.
117  * This call is blocking until the end of the experiment.
118  *
119  * If the asked experiment lasts less than \a min_duration, another one will be
120  * launched (and others, if needed). msg_size will be multiplicated by
121  * MIN(20, (\a min_duration / measured_duration) *1.1) (plus 10% to be sure to eventually
122  * reach the \a min_duration). In that case, the reported bandwidth and
123  * duration are the ones of the last run. \a msg_size cannot go over 64Mb
124  * because we need to malloc a block of this size in RL to conduct the
125  * experiment, and we still don't want to visit the swap. In such case, the 
126  * number of messages is increased instead of their size.
127  *
128  * Results are reported in last args, and sizes are in byte.
129  * 
130  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
131  *           as the total amount of data to send and the msg_size. This
132  *           was changed for the fool wanting to send more than MAXINT
133  *           bytes in a fat pipe.
134  * 
135  */
136 void amok_bw_test(xbt_socket_t peer,
137                   unsigned long int buf_size,
138                   unsigned long int msg_size,
139                   unsigned long int msg_amount,
140                   double min_duration, /*OUT*/ double *sec, double *bw)
141 {
142
143   /* Measurement sockets for the experiments */
144   volatile xbt_socket_t measMasterIn = NULL, measIn, measOut = NULL;
145   volatile int port;
146   bw_request_t request, request_ack;
147   xbt_ex_t e;
148   int first_pass;
149
150   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
151     TRY {
152       measMasterIn = gras_socket_server_ext(++port, buf_size, 1);
153     }
154     CATCH(e) {
155       measMasterIn = NULL;
156       if (port == 10000 - 1) {
157         RETHROWF("Error caught while opening a measurement socket: %s");
158       } else {
159         xbt_ex_free(e);
160       }
161     }
162   }
163
164   request = xbt_new0(s_bw_request_t, 1);
165   request->buf_size = buf_size;
166   request->msg_size = msg_size;
167   request->msg_amount = msg_amount;
168   request->peer.name = NULL;
169   request->peer.port = xbt_socket_my_port(measMasterIn);
170   XBT_DEBUG
171       ("Handshaking with %s:%d to connect it back on my %d (bufsize=%lu, msg_size=%lu, msg_amount=%lu)",
172        xbt_socket_peer_name(peer), xbt_socket_peer_port(peer),
173        request->peer.port, request->buf_size, request->msg_size,
174        request->msg_amount);
175
176   TRY {
177     gras_msg_rpccall(peer, 15, "BW handshake", &request, &request_ack);
178   }
179   CATCH_ANONYMOUS {
180     RETHROWF("Error encountered while sending the BW request: %s");
181   }
182   measIn = xbt_socket_meas_accept(measMasterIn);
183
184   TRY {
185     measOut = gras_socket_client_ext(xbt_socket_peer_name(peer),
186                                      request_ack->peer.port,
187                                      request->buf_size, 1);
188   }
189   CATCH_ANONYMOUS {
190     RETHROWF
191         ("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
192          xbt_socket_peer_name(peer), request_ack->peer.port);
193   }
194   XBT_DEBUG
195       ("Got ACK; conduct the experiment (msg_size = %lu, msg_amount=%lu)",
196        request->msg_size, request->msg_amount);
197
198   *sec = 0;
199   first_pass = 1;
200   do {
201     if (first_pass == 0) {
202       double meas_duration = *sec;
203       double increase;
204       if (*sec != 0.0) {
205         increase = (min_duration / meas_duration) * 1.1;
206       } else {
207         increase = 4;
208       }
209       /* Do not increase the exp size too fast since our decision would be based on wrong measurements */
210       if (increase > 20)
211         increase = 20;
212
213       request->msg_size = request->msg_size * increase;
214
215       /* Do not do too large experiments messages or the sensors 
216          will start to swap to store one of them.
217          And then increase the number of messages to compensate (check for overflow there, too) */
218       if (request->msg_size > 64 * 1024 * 1024) {
219         unsigned long int new_amount =
220             ((request->msg_size / ((double) 64 * 1024 * 1024))
221              * request->msg_amount) + 1;
222
223         xbt_assert(new_amount > request->msg_amount,
224                     "Overflow on the number of messages! You must have a *really* fat pipe. Please fix your platform");
225         request->msg_amount = new_amount;
226
227         request->msg_size = 64 * 1024 * 1024;
228       }
229
230       XBT_VERB
231           ("The experiment was too short (%f sec<%f sec). Redo it with msg_size=%lu (nb_messages=%lu) (got %fMb/s)",
232            meas_duration, min_duration, request->msg_size,
233            request->msg_amount,
234            ((double) request->msg_size) * ((double) request->msg_amount /
235                                            (*sec) / 1024.0 / 1024.0));
236
237       gras_msg_rpccall(peer, 60, "BW reask", &request, NULL);
238     }
239
240     first_pass = 0;
241     *sec = gras_os_time();
242     TRY {
243       xbt_socket_meas_send(measOut, 120, request->msg_size,
244                             request->msg_amount);
245       XBT_DEBUG("Data sent. Wait ACK");
246       xbt_socket_meas_recv(measIn, 120, 1, 1);
247     }
248     CATCH_ANONYMOUS {
249       gras_socket_close(measOut);
250       gras_socket_close(measMasterIn);
251       gras_socket_close(measIn);
252       RETHROWF("Unable to conduct the experiment: %s");
253     }
254     *sec = gras_os_time() - *sec;
255     if (*sec != 0.0) {
256       *bw =
257           ((double) request->msg_size) * ((double) request->msg_amount) /
258           (*sec);
259     }
260     XBT_DEBUG("Experiment done ; it took %f sec", *sec);
261     if (*sec <= 0) {
262       XBT_CRITICAL("Nonpositive value (%f) found for BW test time.", *sec);
263     }
264
265   } while (*sec < min_duration);
266
267   XBT_DEBUG
268       ("This measurement was long enough (%f sec; found %f b/s). Stop peer",
269        *sec, *bw);
270   gras_msg_send(peer, "BW stop", NULL);
271
272   free(request_ack);
273   free(request);
274   if (measIn != measMasterIn)
275     gras_socket_close(measIn);
276   gras_socket_close(measMasterIn);
277   gras_socket_close(measOut);
278 }
279
280
281 /* Callback to the "BW handshake" message: 
282    opens a server measurement socket,
283    indicate its port in an "BW handshaked" message,
284    receive the corresponding data on the measurement socket, 
285    close the measurement socket
286
287    sizes are in byte
288 */
289 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload)
290 {
291   xbt_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
292   volatile xbt_socket_t measMasterIn = NULL,  measIn = NULL, measOut = NULL;
293   volatile bw_request_t request = *(bw_request_t *) payload;
294   bw_request_t answer;
295   xbt_ex_t e;
296   int port;
297   int tooshort = 1;
298   gras_msg_cb_ctx_t ctx_reask;
299   static xbt_dynar_t msgtwaited = NULL;
300
301   XBT_DEBUG
302       ("Handshaked to connect to %s:%d (sizes: buf=%lu msg=%lu msg_amount=%lu)",
303        xbt_socket_peer_name(expeditor), request->peer.port,
304        request->buf_size, request->msg_size, request->msg_amount);
305
306   /* Build our answer */
307   answer = xbt_new0(s_bw_request_t, 1);
308
309   for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
310     TRY {
311       measMasterIn = gras_socket_server_ext(port, request->buf_size, 1);
312     }
313     CATCH(e) {
314       measMasterIn = NULL;
315       if (port < 10000)
316         xbt_ex_free(e);
317       else
318         /* FIXME: tell error to remote */
319         RETHROWF
320             ("Error encountered while opening a measurement server socket: %s");
321     }
322   }
323
324   answer->buf_size = request->buf_size;
325   answer->msg_size = request->msg_size;
326   answer->msg_amount = request->msg_amount;
327   answer->peer.port = xbt_socket_my_port(measMasterIn);
328
329   TRY {
330     gras_msg_rpcreturn(60, ctx, &answer);
331   }
332   CATCH_ANONYMOUS {
333     gras_socket_close(measMasterIn);
334     /* FIXME: tell error to remote */
335     RETHROWF("Error encountered while sending the answer: %s");
336   }
337
338
339   /* Don't connect asap to leave time to other side to enter the accept() */
340   TRY {
341     measOut = gras_socket_client_ext(xbt_socket_peer_name(expeditor),
342                                      request->peer.port,
343                                      request->buf_size, 1);
344   }
345   CATCH_ANONYMOUS {
346     RETHROWF
347         ("Error encountered while opening a measurement socket back to %s:%d : %s",
348          xbt_socket_peer_name(expeditor), request->peer.port);
349     /* FIXME: tell error to remote */
350   }
351
352   TRY {
353     measIn = xbt_socket_meas_accept(measMasterIn);
354     XBT_DEBUG
355         ("BW handshake answered. buf_size=%lu msg_size=%lu msg_amount=%lu port=%d",
356          answer->buf_size, answer->msg_size, answer->msg_amount,
357          answer->peer.port);
358   }
359   CATCH_ANONYMOUS {
360     gras_socket_close(measMasterIn);
361     gras_socket_close(measIn);
362     gras_socket_close(measOut);
363     /* FIXME: tell error to remote ? */
364     RETHROWF("Error encountered while opening the meas socket: %s");
365   }
366
367   if (!msgtwaited) {
368     msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t), NULL);
369     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW stop"));
370     xbt_dynar_push(msgtwaited, gras_msgtype_by_name("BW reask"));
371   }
372
373   while (tooshort) {
374     void *payloadgot;
375     int msggot;
376     TRY {
377       xbt_socket_meas_recv(measIn, 120, request->msg_size,
378                             request->msg_amount);
379       xbt_socket_meas_send(measOut, 120, 1, 1);
380     }
381     CATCH_ANONYMOUS {
382       gras_socket_close(measMasterIn);
383       gras_socket_close(measIn);
384       gras_socket_close(measOut);
385       /* FIXME: tell error to remote ? */
386       RETHROWF("Error encountered while receiving the experiment: %s");
387     }
388     gras_msg_wait_or(60, msgtwaited, &ctx_reask, &msggot, &payloadgot);
389     switch (msggot) {
390     case 0:                    /* BW stop */
391       tooshort = 0;
392       break;
393     case 1:                    /* BW reask */
394       tooshort = 1;
395       free(request);
396       request = (bw_request_t) payloadgot;
397       XBT_VERB("Return the reasking RPC");
398       gras_msg_rpcreturn(60, ctx_reask, NULL);
399     }
400     gras_msg_cb_ctx_free(ctx_reask);
401   }
402
403   if (measIn != measMasterIn)
404     gras_socket_close(measMasterIn);
405   gras_socket_close(measIn);
406   gras_socket_close(measOut);
407   free(answer);
408   free(request);
409   XBT_VERB("BW experiment done.");
410   return 0;
411 }
412
413 /**
414  * \brief request a bandwidth measurement between two remote peers
415  *
416  * \arg from_name: Name of the first peer 
417  * \arg from_port: port on which the first process is listening for messages
418  * \arg to_name: Name of the second peer 
419  * \arg to_port: port on which the second process is listening (for messages, do not 
420  * give a measurement socket here. The needed measurement sockets will be created 
421  * automatically and negociated between the peers)
422  * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
423  * \arg msg_size: Size of each message sent. 
424  * \arg msg_amount: Amount of such data to exchange
425  * \arg sec: where the result (in seconds) should be stored.
426  * \arg bw: observed Bandwidth (in byte/s)
427  *
428  * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
429  * This call is blocking until the end of the experiment.
430  *
431  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
432  *           as the total amount of data to send and the msg_size. This
433  *           was changed for the fool wanting to send more than MAXINT
434  *           bytes in a fat pipe.
435  * 
436  * Results are reported in last args, and sizes are in bytes.
437  */
438 void amok_bw_request(const char *from_name, unsigned int from_port,
439                      const char *to_name, unsigned int to_port,
440                      unsigned long int buf_size,
441                      unsigned long int msg_size,
442                      unsigned long int msg_amount,
443                      double min_duration, /*OUT*/ double *sec, double *bw)
444 {
445
446   xbt_socket_t sock;
447   /* The request */
448   bw_request_t request;
449   bw_res_t result;
450   request = xbt_new0(s_bw_request_t, 1);
451   request->buf_size = buf_size;
452   request->msg_size = msg_size;
453   request->msg_amount = msg_amount;
454   request->min_duration = min_duration;
455
456
457   request->peer.name = (char *) to_name;
458   request->peer.port = to_port;
459
460
461   sock = gras_socket_client(from_name, from_port);
462
463
464
465   XBT_DEBUG("Ask for a BW test between %s:%u and %s:%u", from_name, from_port,
466          to_name, to_port);
467   gras_msg_rpccall(sock, 20 * 60, "BW request", &request, &result);
468
469   if (sec)
470     *sec = result->sec;
471   if (bw)
472     *bw = result->bw;
473
474   XBT_VERB("BW test (%s:%u -> %s:%u) took %f sec (%f kb/s)",
475         from_name, from_port, to_name, to_port,
476         result->sec, ((double) result->bw) / 1024.0);
477
478   gras_socket_close(sock);
479   free(result);
480   free(request);
481 }
482
483 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload)
484 {
485
486   /* specification of the test to run, and our answer */
487   bw_request_t request = *(bw_request_t *) payload;
488   bw_res_t result = xbt_new0(s_bw_res_t, 1);
489   xbt_socket_t peer, asker;
490
491   asker = gras_msg_cb_ctx_from(ctx);
492   XBT_VERB("Asked by %s:%d to conduct a bw XP with %s:%d (request: %lu %lu)",
493         xbt_socket_peer_name(asker), xbt_socket_peer_port(asker),
494         request->peer.name, request->peer.port,
495         request->msg_size, request->msg_amount);
496   peer = gras_socket_client(request->peer.name, request->peer.port);
497   amok_bw_test(peer,
498                request->buf_size, request->msg_size, request->msg_amount,
499                request->min_duration, &(result->sec), &(result->bw));
500
501   gras_msg_rpcreturn(240, ctx, &result);
502
503   gras_os_sleep(1);
504   gras_socket_close(peer);      /* FIXME: it should be blocking in RL until everything is sent */
505   free(request->peer.name);
506   free(request);
507   free(result);
508
509   return 0;
510 }
511
512 /** \brief builds a matrix of results of bandwidth measurement
513  * 
514  * @warning: in SimGrid version 3.1 and previous, the experiment size were specified
515  *           as the total amount of data to send and the msg_size. This
516  *           was changed for the fool wanting to send more than MAXINT
517  *           bytes in a fat pipe.
518  */
519 double *amok_bw_matrix(xbt_dynar_t peers,
520                        int buf_size_bw, int msg_size_bw, int msg_amount_bw,
521                        double min_duration)
522 {
523   double sec;
524   /* construction of matrices for bandwith and latency */
525
526
527   unsigned int i, j;
528   int len = xbt_dynar_length(peers);
529
530   double *matrix_res = xbt_new0(double, len * len);
531   xbt_peer_t p1, p2;
532
533   xbt_dynar_foreach(peers, i, p1) {
534     xbt_dynar_foreach(peers, j, p2) {
535       if (i != j) {
536         /* Mesurements of Bandwidth */
537         amok_bw_request(p1->name, p1->port, p2->name, p2->port,
538                         buf_size_bw, msg_size_bw, msg_amount_bw,
539                         min_duration, &sec, &matrix_res[i * len + j]);
540       }
541     }
542   }
543   return matrix_res;
544 }