Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Revert Darina's last changes. This is definitively not the way to go
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
index fe73944..415ba4b 100644 (file)
@@ -60,8 +60,8 @@ void amok_bw_bw_init() {
 
   /* Build the Bandwidth datatype descriptions */ 
   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
-  gras_datadesc_struct_append(bw_request_desc,"host",
-                             gras_datadesc_by_name("s_xbt_host_t"));
+  gras_datadesc_struct_append(bw_request_desc,"peer",
+                             gras_datadesc_by_name("s_xbt_peer_t"));
   gras_datadesc_struct_append(bw_request_desc,"buf_size",
                              gras_datadesc_by_name("unsigned long int"));
   gras_datadesc_struct_append(bw_request_desc,"exp_size",
@@ -114,6 +114,14 @@ void amok_bw_bw_leave() {
  * Conduct a bandwidth test from the local process to the given peer.
  * This call is blocking until the end of the experiment.
  *
+ * If the asked experiment lasts less than \a min_duration, another one will be
+ * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
+ * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
+ * reach the \a min_duration). In that case, the reported bandwidth and
+ * duration are the ones of the last run. \a msg_size cannot go over 64Mb
+ * because we need to malloc a block of this size in RL to conduct the
+ * experiment, and we still don't want to visit the swap.
+ *
  * Results are reported in last args, and sizes are in byte.
  */
 void amok_bw_test(gras_socket_t peer,
@@ -146,10 +154,10 @@ void amok_bw_test(gras_socket_t peer,
   request->buf_size=buf_size;
   request->exp_size=exp_size;
   request->msg_size=msg_size;
-  request->host.name = NULL;
-  request->host.port = gras_socket_my_port(measMasterIn);
+  request->peer.name = NULL;
+  request->peer.port = gras_socket_my_port(measMasterIn);
   DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
-       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
+       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
        buf_size,request->buf_size);
 
   TRY {
@@ -162,11 +170,11 @@ void amok_bw_test(gras_socket_t peer,
    
   TRY {
     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
-                                  request_ack->host.port, 
+                                  request_ack->peer.port, 
                                   request->buf_size,1);
   } CATCH(e) {
     RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
-            gras_socket_peer_name(peer),request_ack->host.port);
+            gras_socket_peer_name(peer),request_ack->peer.port);
   }
   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
 
@@ -176,12 +184,12 @@ void amok_bw_test(gras_socket_t peer,
       double meas_duration=*sec;
       request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
       request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
+      if (request->msg_size > 64*1024*1024)
+       request->msg_size = 64*1024*1024;
 
-
-      DEBUG5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
+      VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
             meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
       gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);      
-      DEBUG0("Peer is ready for another round of fun");
     }
 
     *sec=gras_os_time();
@@ -235,7 +243,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   static xbt_dynar_t msgtwaited=NULL;
   
   DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
-       gras_socket_peer_name(expeditor),request->host.port,
+       gras_socket_peer_name(expeditor),request->peer.port,
        request->buf_size,request->exp_size,request->msg_size);     
 
   /* Build our answer */
@@ -257,7 +265,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   answer->buf_size=request->buf_size;
   answer->exp_size=request->exp_size;
   answer->msg_size=request->msg_size;
-  answer->host.port=gras_socket_my_port(measMasterIn);
+  answer->peer.port=gras_socket_my_port(measMasterIn);
 
   TRY {
     gras_msg_rpcreturn(60,ctx,&answer);
@@ -271,18 +279,18 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   /* Don't connect asap to leave time to other side to enter the accept() */
   TRY {
     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
-                                    request->host.port,
+                                    request->peer.port,
                                     request->buf_size,1);
   } CATCH(e) {
     RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s", 
-            gras_socket_peer_name(expeditor),request->host.port);
+            gras_socket_peer_name(expeditor),request->peer.port);
     /* FIXME: tell error to remote */
   }
 
   TRY {
     measIn = gras_socket_meas_accept(measMasterIn);
     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
-          answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
+          answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
   } CATCH(e) {
     gras_socket_close(measMasterIn);
     gras_socket_close(measIn);
@@ -301,10 +309,8 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
     void *payload;
     int msggot;
     TRY {
-      DEBUG0("Recv / Send the experiment");
       gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
       gras_socket_meas_send(measOut,120,1,1);
-      DEBUG0("ACK sent");
     } CATCH(e) {
       gras_socket_close(measMasterIn);
       gras_socket_close(measIn);
@@ -338,11 +344,11 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
 }
 
 /**
- * \brief request a bandwidth measurement between two remote hosts
+ * \brief request a bandwidth measurement between two remote peers
  *
- * \arg from_name: Name of the first host 
+ * \arg from_name: Name of the first peer 
  * \arg from_port: port on which the first process is listening for messages
- * \arg to_name: Name of the second host 
+ * \arg to_name: Name of the second peer 
  * \arg to_port: port on which the second process is listening (for messages, do not 
  * give a measurement socket here. The needed measurement sockets will be created 
  * automatically and negociated between the peers)
@@ -352,7 +358,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
  * \arg sec: where the result (in seconds) should be stored.
  * \arg bw: observed Bandwidth (in byte/s)
  *
- * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
+ * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
  * This call is blocking until the end of the experiment.
  *
  * Results are reported in last args, and sizes are in bytes.
@@ -369,21 +375,24 @@ void amok_bw_request(const char* from_name,unsigned int from_port,
   /* The request */
   bw_request_t request;
   bw_res_t result;
-
   request=xbt_new0(s_bw_request_t,1);
   request->buf_size=buf_size;
   request->exp_size=exp_size;
   request->msg_size=msg_size;
   request->min_duration = min_duration;
 
-  request->host.name = (char*)to_name;
-  request->host.port = to_port;
+
+  request->peer.name = (char*)to_name;
+  request->peer.port = to_port;
+
 
   sock = gras_socket_client(from_name,from_port);
+    
   DEBUG4("Ask for a BW test between %s:%d and %s:%d",  from_name,from_port, to_name,to_port);
-
   gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
-  
+
   if (sec)
     *sec=result->sec;
   if (bw)
@@ -409,18 +418,19 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
   asker=gras_msg_cb_ctx_from(ctx);
   VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",        
        gras_socket_peer_name(asker),gras_socket_peer_port(asker),
-       request->host.name,request->host.port);
-  peer = gras_socket_client(request->host.name,request->host.port);
+
+       request->peer.name,request->peer.port);
+  peer = gras_socket_client(request->peer.name,request->peer.port);
   amok_bw_test(peer,
               request->buf_size,request->exp_size,request->msg_size,
               request->min_duration,
               &(result->sec),&(result->bw));
-
   gras_msg_rpcreturn(240,ctx,&result);
 
   gras_os_sleep(1);
   gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
-  free(request->host.name);
+  free(request->peer.name);
   free(request);
   free(result);
   
@@ -428,23 +438,23 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
 }
 
 /** \brief builds a matrix of results of bandwidth measurement */
-double * amok_bw_matrix(xbt_dynar_t hosts,
+double * amok_bw_matrix(xbt_dynar_t peers,
                        int buf_size_bw, int exp_size_bw, int msg_size_bw,
                        double min_duration) { 
   double sec;
   /* construction of matrices for bandwith and latency */
 
 
-  int i,j,len=xbt_dynar_length(hosts);
+  int i,j,len=xbt_dynar_length(peers);
 
   double *matrix_res = xbt_new0(double, len*len);
-  xbt_host_t h1,h2;
+  xbt_peer_t p1,p2;
 
-  xbt_dynar_foreach (hosts,i,h1) {
-    xbt_dynar_foreach (hosts,j,h2) {
+  xbt_dynar_foreach (peers,i,p1) {
+    xbt_dynar_foreach (peers,j,p2) {
       if (i!=j) {
         /* Mesurements of Bandwidth */
-        amok_bw_request(h1->name,h1->port,h2->name,h2->port,
+        amok_bw_request(p1->name,p1->port,p2->name,p2->port,
                         buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
                        &sec,&matrix_res[i*len + j]);
       }