Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make AMOK bandwidth tests more robust.
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
index fe73944..79ab644 100644 (file)
@@ -25,8 +25,6 @@ static short _amok_bw_initialized = 0;
 /** @brief module initialization; all participating nodes must run this */
 void amok_bw_init(void) {
 
-  amok_base_init();
-
   if (! _amok_bw_initialized) {
     amok_bw_bw_init();
     amok_bw_sat_init();
@@ -60,8 +58,8 @@ void amok_bw_bw_init() {
 
   /* Build the Bandwidth datatype descriptions */ 
   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
-  gras_datadesc_struct_append(bw_request_desc,"host",
-                             gras_datadesc_by_name("s_xbt_host_t"));
+  gras_datadesc_struct_append(bw_request_desc,"peer",
+                             gras_datadesc_by_name("s_xbt_peer_t"));
   gras_datadesc_struct_append(bw_request_desc,"buf_size",
                              gras_datadesc_by_name("unsigned long int"));
   gras_datadesc_struct_append(bw_request_desc,"exp_size",
@@ -114,6 +112,14 @@ void amok_bw_bw_leave() {
  * Conduct a bandwidth test from the local process to the given peer.
  * This call is blocking until the end of the experiment.
  *
+ * If the asked experiment lasts less than \a min_duration, another one will be
+ * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
+ * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
+ * reach the \a min_duration). In that case, the reported bandwidth and
+ * duration are the ones of the last run. \a msg_size cannot go over 64Mb
+ * because we need to malloc a block of this size in RL to conduct the
+ * experiment, and we still don't want to visit the swap.
+ *
  * Results are reported in last args, and sizes are in byte.
  */
 void amok_bw_test(gras_socket_t peer,
@@ -128,6 +134,7 @@ void amok_bw_test(gras_socket_t peer,
   int port;
   bw_request_t request,request_ack;
   xbt_ex_t e;
+  int first_pass; 
   
   for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
     TRY {
@@ -146,10 +153,10 @@ void amok_bw_test(gras_socket_t peer,
   request->buf_size=buf_size;
   request->exp_size=exp_size;
   request->msg_size=msg_size;
-  request->host.name = NULL;
-  request->host.port = gras_socket_my_port(measMasterIn);
-  DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
-       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
+  request->peer.name = NULL;
+  request->peer.port = gras_socket_my_port(measMasterIn);
+  DEBUG5("Handshaking with %s:%d to connect it back on my %d (bufsize=%ld byte= %ld b)", 
+       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
        buf_size,request->buf_size);
 
   TRY {
@@ -162,28 +169,37 @@ void amok_bw_test(gras_socket_t peer,
    
   TRY {
     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
-                                  request_ack->host.port, 
+                                  request_ack->peer.port, 
                                   request->buf_size,1);
   } CATCH(e) {
     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
-            gras_socket_peer_name(peer),request_ack->host.port);
+            gras_socket_peer_name(peer),request_ack->peer.port);
   }
-  DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
+  DEBUG2("Got ACK; conduct the experiment (exp_size = %ld, msg_size=%ld)",
+        request->exp_size, request->msg_size);
 
   *sec = 0;
+  first_pass = 1;
   do {
-    if (*sec>0) {
+    if (first_pass == 0) {
       double meas_duration=*sec;
-      request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
-      request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
-
+      if (*sec != 0.0 ) { 
+       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
+       request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
+      } else {
+       request->exp_size = request->exp_size * 4; 
+       request->msg_size = request->msg_size * 4; 
+      }
+            
+      if (request->msg_size > 64*1024*1024)
+       request->msg_size = 64*1024*1024;
 
-      DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
+      VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
             meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
-      DEBUG0("Peer is ready for another round of fun");
     }
 
+    first_pass = 0;
     *sec=gras_os_time();
     TRY {
       gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
@@ -195,10 +211,13 @@ void amok_bw_test(gras_socket_t peer,
       gras_socket_close(measIn);
       RETHROW0("Unable to conduct the experiment: %s");
     }
-    DEBUG0("Experiment done");
-
     *sec = gras_os_time() - *sec;
-    *bw = ((double)request->exp_size) / *sec;
+    if (*sec != 0.0) { *bw = ((double)request->exp_size) / *sec; }
+    DEBUG1("Experiment done ; it took %f sec", *sec);
+    if (*sec <= 0) {
+      CRITICAL1("Nonpositive value (%f) found for BW test time.", *sec);
+    }
+
   } while (*sec < min_duration);
 
   DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
@@ -235,7 +254,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   static xbt_dynar_t msgtwaited=NULL;
   
   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
-       gras_socket_peer_name(expeditor),request->host.port,
+       gras_socket_peer_name(expeditor),request->peer.port,
        request->buf_size,request->exp_size,request->msg_size);     
 
   /* Build our answer */
@@ -257,7 +276,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   answer->buf_size=request->buf_size;
   answer->exp_size=request->exp_size;
   answer->msg_size=request->msg_size;
-  answer->host.port=gras_socket_my_port(measMasterIn);
+  answer->peer.port=gras_socket_my_port(measMasterIn);
 
   TRY {
     gras_msg_rpcreturn(60,ctx,&answer);
@@ -271,18 +290,18 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   /* Don't connect asap to leave time to other side to enter the accept() */
   TRY {
     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
-                                    request->host.port,
+                                    request->peer.port,
                                     request->buf_size,1);
   } CATCH(e) {
     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
-            gras_socket_peer_name(expeditor),request->host.port);
+            gras_socket_peer_name(expeditor),request->peer.port);
     /* FIXME: tell error to remote */
   }
 
   TRY {
     measIn = gras_socket_meas_accept(measMasterIn);
     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
-          answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
+          answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
   } CATCH(e) {
     gras_socket_close(measMasterIn);
     gras_socket_close(measIn);
@@ -301,10 +320,8 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
     void *payload;
     int msggot;
     TRY {
-      DEBUG0("Recv / Send the experiment");
       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
       gras_socket_meas_send(measOut,120,1,1);
-      DEBUG0("ACK sent");
     } CATCH(e) {
       gras_socket_close(measMasterIn);
       gras_socket_close(measIn);
@@ -338,11 +355,11 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
 }
 
 /**
- * \brief request a bandwidth measurement between two remote hosts
+ * \brief request a bandwidth measurement between two remote peers
  *
- * \arg from_name: Name of the first host 
+ * \arg from_name: Name of the first peer 
  * \arg from_port: port on which the first process is listening for messages
- * \arg to_name: Name of the second host 
+ * \arg to_name: Name of the second peer 
  * \arg to_port: port on which the second process is listening (for messages, do not 
  * give a measurement socket here. The needed measurement sockets will be created 
  * automatically and negociated between the peers)
@@ -352,7 +369,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
  * \arg sec: where the result (in seconds) should be stored.
  * \arg bw: observed Bandwidth (in byte/s)
  *
- * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
+ * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
  * This call is blocking until the end of the experiment.
  *
  * Results are reported in last args, and sizes are in bytes.
@@ -369,21 +386,24 @@ void amok_bw_request(const char* from_name,unsigned int from_port,
   /* The request */
   bw_request_t request;
   bw_res_t result;
-
   request=xbt_new0(s_bw_request_t,1);
   request->buf_size=buf_size;
   request->exp_size=exp_size;
   request->msg_size=msg_size;
   request->min_duration = min_duration;
 
-  request->host.name = (char*)to_name;
-  request->host.port = to_port;
+
+  request->peer.name = (char*)to_name;
+  request->peer.port = to_port;
+
 
   sock = gras_socket_client(from_name,from_port);
+    
   DEBUG4("Ask for a BW test between %s:%d and %s:%d",  from_name,from_port, to_name,to_port);
-
   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
-  
+
   if (sec)
     *sec=result->sec;
   if (bw)
@@ -409,18 +429,19 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
   asker=gras_msg_cb_ctx_from(ctx);
   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",        
        gras_socket_peer_name(asker),gras_socket_peer_port(asker),
-       request->host.name,request->host.port);
-  peer = gras_socket_client(request->host.name,request->host.port);
+
+       request->peer.name,request->peer.port);
+  peer = gras_socket_client(request->peer.name,request->peer.port);
   amok_bw_test(peer,
               request->buf_size,request->exp_size,request->msg_size,
               request->min_duration,
               &(result->sec),&(result->bw));
-
   gras_msg_rpcreturn(240,ctx,&result);
 
   gras_os_sleep(1);
   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
-  free(request->host.name);
+  free(request->peer.name);
   free(request);
   free(result);
   
@@ -428,23 +449,23 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
 }
 
 /** \brief builds a matrix of results of bandwidth measurement */
-double * amok_bw_matrix(xbt_dynar_t hosts,
+double * amok_bw_matrix(xbt_dynar_t peers,
                        int buf_size_bw, int exp_size_bw, int msg_size_bw,
                        double min_duration) { 
   double sec;
   /* construction of matrices for bandwith and latency */
 
 
-  int i,j,len=xbt_dynar_length(hosts);
+  int i,j,len=xbt_dynar_length(peers);
 
   double *matrix_res = xbt_new0(double, len*len);
-  xbt_host_t h1,h2;
+  xbt_peer_t p1,p2;
 
-  xbt_dynar_foreach (hosts,i,h1) {
-    xbt_dynar_foreach (hosts,j,h2) {
+  xbt_dynar_foreach (peers,i,p1) {
+    xbt_dynar_foreach (peers,j,p2) {
       if (i!=j) {
         /* Mesurements of Bandwidth */
-        amok_bw_request(h1->name,h1->port,h2->name,h2->port,
+        amok_bw_request(p1->name,p1->port,p2->name,p2->port,
                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
                        &sec,&matrix_res[i*len + j]);
       }