Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
let it work in RL (more work needed for SG)
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 23 Jun 2005 15:29:37 +0000 (15:29 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Thu, 23 Jun 2005 15:29:37 +0000 (15:29 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1405 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/amok/Bandwidth/bandwidth.c

index a33b74e..2c2835c 100644 (file)
@@ -2,7 +2,7 @@
 
 /* amok_bandwidth - Bandwidth tests facilities                              */
 
 
 /* amok_bandwidth - Bandwidth tests facilities                              */
 
-/* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
+/* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -24,10 +24,14 @@ void amok_bw_init(void) {
        
      /* Build the datatype descriptions */ 
      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
        
      /* Build the datatype descriptions */ 
      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
-     gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
-     gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
+     gras_datadesc_struct_append(bw_request_desc,"host",
+                                gras_datadesc_by_name("xbt_host_t"));
+     gras_datadesc_struct_append(bw_request_desc,"buf_size",
+                                gras_datadesc_by_name("unsigned long int"));
+     gras_datadesc_struct_append(bw_request_desc,"exp_size",
+                                gras_datadesc_by_name("unsigned long int"));
+     gras_datadesc_struct_append(bw_request_desc,"msg_size",
+                                gras_datadesc_by_name("unsigned long int"));
      gras_datadesc_struct_close(bw_request_desc);
      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
 
      gras_datadesc_struct_close(bw_request_desc);
      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
 
@@ -101,23 +105,26 @@ void amok_bw_exit(void) {
  * ***************************************************************************/
 
 /**
  * ***************************************************************************/
 
 /**
- * amok_bw_test:
+ * \brief bandwidth measurement between localhost and @peer
  * 
  * 
- * Conduct a test between the local host and @peer, and 
- * report the result in last args
+ * Results are reported in last args, and sizes are in kb.
  */
 xbt_error_t amok_bw_test(gras_socket_t peer,
  */
 xbt_error_t amok_bw_test(gras_socket_t peer,
-                         unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
-                         /*OUT*/ double *sec, double *bw) {
-  gras_socket_t measIn,measOut; /* Measurement sockets for the experiments */
+                        unsigned long int buf_size,
+                        unsigned long int exp_size,
+                        unsigned long int msg_size,
+                        /*OUT*/ double *sec, double *bw) {
+
+  /* Measurement sockets for the experiments */
+  gras_socket_t measMasterIn,measIn,measOut;
   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
   int port;
   xbt_error_t errcode;
   bw_request_t request,request_ack;
   
   for (port = 5000, errcode = system_error;
   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
   int port;
   xbt_error_t errcode;
   bw_request_t request,request_ack;
   
   for (port = 5000, errcode = system_error;
-       errcode == system_error;
-       errcode = gras_socket_server_ext(++port,buf_size,1,&measIn));
+       errcode == system_error && port < 10000;
+       errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
   if (errcode != no_error) {
     ERROR1("Error %s encountered while opening a measurement socket",
           xbt_error_name(errcode));
   if (errcode != no_error) {
     ERROR1("Error %s encountered while opening a measurement socket",
           xbt_error_name(errcode));
@@ -125,19 +132,23 @@ xbt_error_t amok_bw_test(gras_socket_t peer,
   }
        
   request=xbt_new0(s_bw_request_t,1);
   }
        
   request=xbt_new0(s_bw_request_t,1);
-  request->buf_size=buf_size;
-  request->exp_size=exp_size;
-  request->msg_size=msg_size;
+  request->buf_size=buf_size*1024;
+  request->exp_size=exp_size*1024;
+  request->msg_size=msg_size*1024;
   request->host.name = NULL;
   request->host.name = NULL;
-  request->host.port = gras_socket_my_port(measIn);
-  INFO3("Handshaking with %s:%d to connect it back on my %d", 
-       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port);
+  request->host.port = gras_socket_my_port(measMasterIn);
+  VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
+       gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
+       buf_size,request->buf_size);
 
   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
     return errcode;
   }
 
   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
     return errcode;
   }
-  if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
+  TRY(gras_socket_meas_accept(measMasterIn,&measIn));
+
+  if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
+                            &sock_dummy,&request_ack))) {
     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
            xbt_error_name(errcode));
     return errcode;
     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
            xbt_error_name(errcode));
     return errcode;
@@ -145,30 +156,37 @@ xbt_error_t amok_bw_test(gras_socket_t peer,
    
   /* FIXME: What if there is a remote error? */
    
    
   /* FIXME: What if there is a remote error? */
    
-  if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&measOut))) {
+  if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
+                                    request_ack->host.port, 
+                                    request->buf_size,1,&measOut))) {
     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
            xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
     return errcode;
   }
     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
            xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
     return errcode;
   }
-  free(request_ack);
-  INFO0("Got ACK");
+  DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
 
   *sec=gras_os_time();
 
   *sec=gras_os_time();
-  if ((errcode=gras_socket_meas_send(measOut,120,exp_size,msg_size)) ||
-      (errcode=gras_socket_meas_recv(measIn,120,1,1))) {
+  TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
+  TRY(gras_socket_meas_recv(measIn,120,1,1));
+
+  /*catch
     ERROR1("Error %s encountered while sending the BW experiment.",
            xbt_error_name(errcode));
     gras_socket_close(measOut);
     ERROR1("Error %s encountered while sending the BW experiment.",
            xbt_error_name(errcode));
     gras_socket_close(measOut);
+    gras_socket_close(measMasterIn);
     gras_socket_close(measIn);
     gras_socket_close(measIn);
-    return errcode;
-  }
+  */
+
   *sec = gras_os_time() - *sec;
   *sec = gras_os_time() - *sec;
-  *bw = ((double)exp_size /* 8.0*/) / *sec / (1024.0 *1024.0);
-INFO0("DOOONE");
-   
-  gras_socket_close(measIn);
+  *bw = ((double)exp_size) / *sec;
+
+  free(request_ack);
+  free(request);
+  if (measIn != measMasterIn)
+    gras_socket_close(measIn);
+  gras_socket_close(measMasterIn);
   gras_socket_close(measOut);
   gras_socket_close(measOut);
-  return no_error;
+  return errcode;
 }
 
 
 }
 
 
@@ -177,64 +195,83 @@ INFO0("DOOONE");
    indicate its port in an "BW handshaked" message,
    receive the corresponding data on the measurement socket, 
    close the measurment socket
    indicate its port in an "BW handshaked" message,
    receive the corresponding data on the measurement socket, 
    close the measurment socket
+
+   sizes are in byte (got converted from kb my expeditor)
 */
 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
                            void          *payload) {
 */
 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
                            void          *payload) {
-  gras_socket_t measIn,measOut;
+  gras_socket_t measMasterIn,measIn,measOut;
   bw_request_t request=*(bw_request_t*)payload;
   bw_request_t answer;
   xbt_error_t errcode;
   int port;
   
   bw_request_t request=*(bw_request_t*)payload;
   bw_request_t answer;
   xbt_error_t errcode;
   int port;
   
-  INFO2("Handshaked to connect to %s:%d",
-       gras_socket_peer_name(expeditor),request->host.port);
-     
+  VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
+       gras_socket_peer_name(expeditor),request->host.port,
+       request->buf_size,request->exp_size,request->msg_size);     
+
+  /* Build our answer */
   answer = xbt_new0(s_bw_request_t,1);
   
   for (port = 6000, errcode = system_error;
        errcode == system_error;
   answer = xbt_new0(s_bw_request_t,1);
   
   for (port = 6000, errcode = system_error;
        errcode == system_error;
-       errcode = gras_socket_server_ext(++port,request->buf_size,1,&measIn));
+       errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
   if (errcode != no_error) {
     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
     /* FIXME: tell error to remote */
     return 1;
   }
   if (errcode != no_error) {
     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
     /* FIXME: tell error to remote */
     return 1;
   }
+   
+  answer->buf_size=request->buf_size;
+  answer->exp_size=request->exp_size;
+  answer->msg_size=request->msg_size;
+  answer->host.port=gras_socket_my_port(measMasterIn);
 
 
-  if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),request->host.port,
+  /* Don't connect asap to leave time to other side to enter the accept() */
+  if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
+                                     request->host.port,
                                      request->buf_size,1,&measOut))) { 
     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
           xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
     /* FIXME: tell error to remote */
     return 1;
   }
                                      request->buf_size,1,&measOut))) { 
     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
           xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
     /* FIXME: tell error to remote */
     return 1;
   }
-   
-  answer->buf_size=request->buf_size;
-  answer->exp_size=request->exp_size;
-  answer->msg_size=request->msg_size;
-  answer->host.port=gras_socket_my_port(measIn);
 
 
-  if ((errcode=gras_msg_send(expeditor,gras_msgtype_by_name("BW handshake ACK"),&answer))) {
+
+  if ((errcode=gras_msg_send(expeditor,
+                            gras_msgtype_by_name("BW handshake ACK"),
+                            &answer))) {
     ERROR1("Error %s encountered while sending the answer.",
            xbt_error_name(errcode));
     ERROR1("Error %s encountered while sending the answer.",
            xbt_error_name(errcode));
-    gras_socket_close(measIn);
+    gras_socket_close(measMasterIn);
     gras_socket_close(measOut);
     /* FIXME: tell error to remote */
     return 1;
   }
     gras_socket_close(measOut);
     /* FIXME: tell error to remote */
     return 1;
   }
-  INFO4("BW handshake answered. buf_size=%d exp_size=%d msg_size=%d port=%d",
+  TRY(gras_socket_meas_accept(measMasterIn,&measIn));
+  DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
        answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
        answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
-    
-  if ((errcode=gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size)) ||
-      (errcode=gras_socket_meas_send(measOut,120,1,1))) {
+
+  TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
+  TRY(gras_socket_meas_send(measOut,120,1,1));
+
+  /*catch
     ERROR1("Error %s encountered while receiving the experiment.",
            xbt_error_name(errcode));
     ERROR1("Error %s encountered while receiving the experiment.",
            xbt_error_name(errcode));
+    gras_socket_close(measMasterIn);
     gras_socket_close(measIn);
     gras_socket_close(measOut);
     gras_socket_close(measIn);
     gras_socket_close(measOut);
-    /* FIXME: tell error to remote ? */
+    * FIXME: tell error to remote ? *
     return 1;
     return 1;
-  }
+    }*/
+
+  if (measIn != measMasterIn)
+    gras_socket_close(measMasterIn);
   gras_socket_close(measIn);
   gras_socket_close(measOut);
   gras_socket_close(measIn);
   gras_socket_close(measOut);
+  free(answer);
+  free(request);
+  DEBUG0("BW experiment done.");
   return 1;
 }
 
   return 1;
 }