Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
allow caller to not be interested in all the data we can provide
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
index 4d23b5c..9e61fe2 100644 (file)
@@ -2,7 +2,9 @@
 
 /* amok_bandwidth - Bandwidth tests facilities                              */
 
-/* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
+/* Copyright (c) 2003-6 Martin Quinson.                                     */
+/* Copyright (c) 2006   Ahmed Harbaoui.                                     */
+/* All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
 
+
+/******************************
+ * Stuff global to the module *
+ ******************************/
+
 static short _amok_bw_initialized = 0;
 
 /** @brief module initialization; all participating nodes must run this */
 void amok_bw_init(void) {
-  gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
 
   amok_base_init();
 
   if (! _amok_bw_initialized) {
-       
-     /* Build the datatype descriptions */ 
-     bw_request_desc = gras_datadesc_struct("s_bw_request_t");
-     gras_datadesc_struct_append(bw_request_desc,"host",
-                                gras_datadesc_by_name("xbt_host_t"));
-     gras_datadesc_struct_append(bw_request_desc,"buf_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_append(bw_request_desc,"exp_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_append(bw_request_desc,"msg_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_close(bw_request_desc);
-     bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
-
-     bw_res_desc = gras_datadesc_struct("s_bw_res_t");
-     gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
-     gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
-     gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
-     gras_datadesc_struct_close(bw_res_desc);
-     bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
-
-     sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
-     gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
-     gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_close(sat_request_desc);
-     sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
-     
-     /* Register the bandwidth messages */
-     gras_msgtype_declare("BW handshake",     bw_request_desc);
-     gras_msgtype_declare("BW handshake ACK", bw_request_desc);
-     gras_msgtype_declare("BW request",       bw_request_desc);
-     gras_msgtype_declare("BW result",        bw_res_desc);
-     
-     /* Register the saturation messages */
-     gras_msgtype_declare("SAT start",   sat_request_desc);
-     gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
-     gras_msgtype_declare("SAT begin",   sat_request_desc);
-     gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
-     gras_msgtype_declare("SAT end",     NULL);
-     gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
-     gras_msgtype_declare("SAT stop",    NULL);
-     gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
+    amok_bw_bw_init();
+    amok_bw_sat_init();
   }
    
-  /* Register the callbacks */
-  gras_cb_register(gras_msgtype_by_name("BW request"),
-                  &amok_bw_cb_bw_request);
-  gras_cb_register(gras_msgtype_by_name("BW handshake"),
-                  &amok_bw_cb_bw_handshake);
+  amok_bw_bw_join();
+  amok_bw_sat_join();
 
-  gras_cb_register(gras_msgtype_by_name("SAT start"),
-                  &amok_bw_cb_sat_start);
-  gras_cb_register(gras_msgtype_by_name("SAT begin"),
-                  &amok_bw_cb_sat_begin);
-  
-  _amok_bw_initialized =1;
+  _amok_bw_initialized++;
 }
 
 /** @brief module finalization */
@@ -87,22 +43,56 @@ void amok_bw_exit(void) {
   if (! _amok_bw_initialized)
     return;
    
-  gras_cb_unregister(gras_msgtype_by_name("BW request"),
-                    &amok_bw_cb_bw_request);
-  gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
-                    &amok_bw_cb_bw_handshake);
-
-  gras_cb_unregister(gras_msgtype_by_name("SAT start"),
-                    &amok_bw_cb_sat_start);
-  gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
-                    &amok_bw_cb_sat_begin);
+  amok_bw_bw_leave();
+  amok_bw_sat_leave();
 
-  _amok_bw_initialized = 0;
+  _amok_bw_initialized--;
 }
 
 /* ***************************************************************************
  * Bandwidth tests
  * ***************************************************************************/
+static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
+static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
+
+void amok_bw_bw_init() {
+  gras_datadesc_type_t bw_request_desc, bw_res_desc;
+
+  /* Build the Bandwidth datatype descriptions */ 
+  bw_request_desc = gras_datadesc_struct("s_bw_request_t");
+  gras_datadesc_struct_append(bw_request_desc,"host",
+                             gras_datadesc_by_name("s_xbt_host_t"));
+  gras_datadesc_struct_append(bw_request_desc,"buf_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_append(bw_request_desc,"exp_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_append(bw_request_desc,"msg_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_close(bw_request_desc);
+  bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
+  
+  bw_res_desc = gras_datadesc_struct("s_bw_res_t");
+  gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
+  gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_close(bw_res_desc);
+  bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
+  
+  gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
+  gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
+}
+void amok_bw_bw_join() {
+  gras_cb_register(gras_msgtype_by_name("BW request"),
+                  &amok_bw_cb_bw_request);
+  gras_cb_register(gras_msgtype_by_name("BW handshake"),
+                  &amok_bw_cb_bw_handshake);
+}
+void amok_bw_bw_leave() {
+  gras_cb_unregister(gras_msgtype_by_name("BW request"),
+                    &amok_bw_cb_bw_request);
+  gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
+                    &amok_bw_cb_bw_handshake);
+}
 
 /**
  * \brief bandwidth measurement between localhost and \e peer
@@ -112,12 +102,12 @@ void amok_bw_exit(void) {
  * \arg exp_size: Total size of data sent across the network
  * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
  * \arg sec: where the result (in seconds) should be stored.
- * \arg bw: observed Bandwidth (in kb/s) 
+ * \arg bw: observed Bandwidth (in byte/s) 
  *
  * Conduct a bandwidth test from the local process to the given peer.
  * This call is blocking until the end of the experiment.
  *
- * Results are reported in last args, and sizes are in kb.
+ * Results are reported in last args, and sizes are in byte.
  */
 void amok_bw_test(gras_socket_t peer,
                  unsigned long int buf_size,
@@ -126,7 +116,7 @@ void amok_bw_test(gras_socket_t peer,
          /*OUT*/ double *sec, double *bw) {
 
   /* Measurement sockets for the experiments */
-  gras_socket_t measMasterIn=NULL,measIn,measOut;
+  gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
   int port;
   bw_request_t request,request_ack;
   xbt_ex_t e;
@@ -136,38 +126,31 @@ void amok_bw_test(gras_socket_t peer,
       measMasterIn = gras_socket_server_ext(++port,buf_size,1);
     } CATCH(e) {
       measMasterIn = NULL;
-      if (port < 10000) {
-       xbt_ex_free(e);
-      } else {
+      if (port == 10000 -1) {
        RETHROW0("Error caught while opening a measurement socket: %s");
+      } else {
+       xbt_ex_free(e); 
       }
     }
   }
   
   request=xbt_new0(s_bw_request_t,1);
-  request->buf_size=buf_size*1024;
-  request->exp_size=exp_size*1024;
-  request->msg_size=msg_size*1024;
+  request->buf_size=buf_size;
+  request->exp_size=exp_size;
+  request->msg_size=msg_size;
   request->host.name = NULL;
   request->host.port = gras_socket_my_port(measMasterIn);
-  VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
+  VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)", 
        gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
        buf_size,request->buf_size);
 
   TRY {
-    gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
+    gras_msg_rpccall(peer,60,
+                    gras_msgtype_by_name("BW handshake"),&request, &request_ack);
   } CATCH(e) {
     RETHROW0("Error encountered while sending the BW request: %s");
   }
   measIn = gras_socket_meas_accept(measMasterIn);
-
-  TRY {
-    gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
-  } CATCH(e) {
-    RETHROW0("Error encountered while waiting for the answer to BW request: %s");
-  }
-  
-  /* FIXME: What if there is a remote error? */
    
   TRY {
     measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
@@ -208,11 +191,12 @@ void amok_bw_test(gras_socket_t peer,
    receive the corresponding data on the measurement socket, 
    close the measurment socket
 
-   sizes are in byte (got converted from kb my expeditor)
+   sizes are in byte
 */
-int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
+int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
                            void          *payload) {
-  gras_socket_t measMasterIn=NULL,measIn,measOut;
+  gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
+  gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
   bw_request_t request=*(bw_request_t*)payload;
   bw_request_t answer;
   xbt_ex_t e;
@@ -225,7 +209,7 @@ int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
   /* Build our answer */
   answer = xbt_new0(s_bw_request_t,1);
   
-  for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
+  for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
     TRY {
       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
     } CATCH(e) {
@@ -243,6 +227,15 @@ int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
   answer->msg_size=request->msg_size;
   answer->host.port=gras_socket_my_port(measMasterIn);
 
+  TRY {
+    gras_msg_rpcreturn(60,ctx,&answer);
+  } CATCH(e) { 
+    gras_socket_close(measMasterIn);
+    /* FIXME: tell error to remote */
+    RETHROW0("Error encountered while sending the answer: %s");
+  }
+
+
   /* Don't connect asap to leave time to other side to enter the accept() */
   TRY {
     measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
@@ -254,15 +247,6 @@ int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
     /* FIXME: tell error to remote */
   }
 
-  TRY {
-    gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
-  } CATCH(e) { 
-    gras_socket_close(measMasterIn);
-    gras_socket_close(measOut);
-    /* FIXME: tell error to remote */
-    RETHROW0("Error encountered while sending the answer: %s");
-  }
-
   TRY {
     measIn = gras_socket_meas_accept(measMasterIn);
     DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
@@ -301,12 +285,12 @@ int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
  * \arg exp_size: Total size of data sent across the network
  * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
  * \arg sec: where the result (in seconds) should be stored.
- * \arg bw: observed Bandwidth (in kb/s)
+ * \arg bw: observed Bandwidth (in byte/s)
  *
  * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
  * This call is blocking until the end of the experiment.
  *
- * Results are reported in last args, and sizes are in kb.
+ * Results are reported in last args, and sizes are in bytes.
  */
 void amok_bw_request(const char* from_name,unsigned int from_port,
                     const char* to_name,unsigned int to_port,
@@ -329,28 +313,28 @@ void amok_bw_request(const char* from_name,unsigned int from_port,
   request->host.port = to_port;
 
   sock = gras_socket_client(from_name,from_port);
-  gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
-  free(request);
-
-  gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
+  gras_msg_rpccall(sock,240,gras_msgtype_by_name("BW request"),&request, &result);
   
-  *sec=result->sec;
-  *bw =result->bw;
+  if (sec)
+    *sec=result->sec;
+  if (bw)
+    *bw =result->bw;
 
   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
        from_name,from_port, to_name,to_port,
-       *sec,*bw);
+       *sec,((double)*bw)/1024.0);
 
   gras_socket_close(sock);
   free(result);
+  free(request);
 }
 
-int amok_bw_cb_bw_request(gras_socket_t    expeditor,
+int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
                          void            *payload) {
                          
   /* specification of the test to run, and our answer */
   bw_request_t request = *(bw_request_t*)payload;
-  bw_res_t result = xbt_new0(s_bw_res,1);
+  bw_res_t result = xbt_new0(s_bw_res_t,1);
   gras_socket_t peer;
 
   peer = gras_socket_client(request->host.name,request->host.port);
@@ -358,7 +342,7 @@ int amok_bw_cb_bw_request(gras_socket_t    expeditor,
               request->buf_size,request->exp_size,request->msg_size,
               &(result->sec),&(result->bw));
 
-  gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
+  gras_msg_rpcreturn(240,ctx,&result);
 
   gras_os_sleep(1);
   gras_socket_close(peer);
@@ -368,13 +352,25 @@ int amok_bw_cb_bw_request(gras_socket_t    expeditor,
   return 1;
 }
 
-int amok_bw_cb_sat_start(gras_socket_t     expeditor,
-                        void             *payload) {
-   CRITICAL0("amok_bw_cb_sat_start; not implemented");
-   return 1;
-} 
-int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
-                        void             *payload) {
-   CRITICAL0("amok_bw_cb_sat_begin: not implemented");
-   return 1;
+double * amok_bw_matrix(xbt_dynar_t hosts,
+                         int buf_size_bw, int exp_size_bw, int msg_size_bw) { 
+  double sec;
+  /* construct of matrixs for bandwith and Latency */
+
+
+  int i,j,len=xbt_dynar_length(hosts);
+
+  double *matrix_res = xbt_new0(double, len*len);
+  xbt_host_t h1,h2;
+
+  xbt_dynar_foreach (hosts,i,h1) {
+    xbt_dynar_foreach (hosts,j,h2) {
+      if (i!=j) {
+        /* Mesurements of Bandwidth */
+        amok_bw_request(h1->name,h1->port,h2->name,h2->port,
+                        buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_res[i*len + j]);
+      } 
+    }
+  }
+  return matrix_res;
 }