Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
* Better split between bandwidth and saturation sub-modules
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 4 Apr 2006 14:29:24 +0000 (14:29 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 4 Apr 2006 14:29:24 +0000 (14:29 +0000)
* Sizes are now in bytes in GRAS
* in amok_bw_matrix, no need to free the iterators: it wont work
* first draft of the saturation stuff (not really functionnal yet)

git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@2075 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/amok/Bandwidth/bandwidth.c
src/amok/Bandwidth/bandwidth_private.h
src/amok/Bandwidth/saturate.c

index c9601a0..a6fbf24 100644 (file)
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
 
+
+/******************************
+ * Stuff global to the module *
+ ******************************/
+
 static short _amok_bw_initialized = 0;
 
 /** @brief module initialization; all participating nodes must run this */
 void amok_bw_init(void) {
-  gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
 
   amok_base_init();
 
   if (! _amok_bw_initialized) {
-       
-     /* Build the Bandwidth datatype descriptions */ 
-     bw_request_desc = gras_datadesc_struct("s_bw_request_t");
-     gras_datadesc_struct_append(bw_request_desc,"host",
-                                gras_datadesc_by_name("xbt_host_t"));
-     gras_datadesc_struct_append(bw_request_desc,"buf_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_append(bw_request_desc,"exp_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_append(bw_request_desc,"msg_size",
-                                gras_datadesc_by_name("unsigned long int"));
-     gras_datadesc_struct_close(bw_request_desc);
-     bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
-
-     bw_res_desc = gras_datadesc_struct("s_bw_res_t");
-     gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
-     gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
-     gras_datadesc_struct_close(bw_res_desc);
-     bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
-
-     gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
-     gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
-
-     /* Build the saturation datatype descriptions */ 
-     sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
-     gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
-     gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
-     gras_datadesc_struct_close(sat_request_desc);
-     sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
-     
-     /* Register the saturation messages */
-     gras_msgtype_declare("SAT start",   sat_request_desc);
-     gras_msgtype_declare("SAT started", NULL);
-     gras_msgtype_declare("SAT begin",   sat_request_desc);
-     gras_msgtype_declare("SAT begun",   NULL);
-     gras_msgtype_declare("SAT end",     NULL);
-     gras_msgtype_declare("SAT ended",   NULL);
-     gras_msgtype_declare("SAT stop",    NULL);
-     gras_msgtype_declare("SAT stopped", NULL);
+    amok_bw_bw_init();
+    amok_bw_sat_init();
   }
    
-  /* Register the callbacks */
-  gras_cb_register(gras_msgtype_by_name("BW request"),
-                  &amok_bw_cb_bw_request);
-  gras_cb_register(gras_msgtype_by_name("BW handshake"),
-                  &amok_bw_cb_bw_handshake);
+  amok_bw_bw_join();
+  amok_bw_sat_join();
 
-  gras_cb_register(gras_msgtype_by_name("SAT start"),
-                  &amok_bw_cb_sat_start);
-  gras_cb_register(gras_msgtype_by_name("SAT begin"),
-                  &amok_bw_cb_sat_begin);
-  
-  _amok_bw_initialized =1;
+  _amok_bw_initialized++;
 }
 
 /** @brief module finalization */
@@ -86,22 +43,56 @@ void amok_bw_exit(void) {
   if (! _amok_bw_initialized)
     return;
    
-  gras_cb_unregister(gras_msgtype_by_name("BW request"),
-                    &amok_bw_cb_bw_request);
-  gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
-                    &amok_bw_cb_bw_handshake);
+  amok_bw_bw_leave();
+  amok_bw_sat_leave();
 
-  gras_cb_unregister(gras_msgtype_by_name("SAT start"),
-                    &amok_bw_cb_sat_start);
-  gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
-                    &amok_bw_cb_sat_begin);
-
-  _amok_bw_initialized = 0;
+  _amok_bw_initialized--;
 }
 
 /* ***************************************************************************
  * Bandwidth tests
  * ***************************************************************************/
+static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
+static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
+
+void amok_bw_bw_init() {
+  gras_datadesc_type_t bw_request_desc, bw_res_desc;
+
+  /* Build the Bandwidth datatype descriptions */ 
+  bw_request_desc = gras_datadesc_struct("s_bw_request_t");
+  gras_datadesc_struct_append(bw_request_desc,"host",
+                             gras_datadesc_by_name("s_xbt_host_t"));
+  gras_datadesc_struct_append(bw_request_desc,"buf_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_append(bw_request_desc,"exp_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_append(bw_request_desc,"msg_size",
+                             gras_datadesc_by_name("unsigned long int"));
+  gras_datadesc_struct_close(bw_request_desc);
+  bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
+  
+  bw_res_desc = gras_datadesc_struct("s_bw_res_t");
+  gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
+  gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
+  gras_datadesc_struct_close(bw_res_desc);
+  bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
+  
+  gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
+  gras_msgtype_declare_rpc("BW request",  bw_request_desc,bw_res_desc);
+}
+void amok_bw_bw_join() {
+  gras_cb_register(gras_msgtype_by_name("BW request"),
+                  &amok_bw_cb_bw_request);
+  gras_cb_register(gras_msgtype_by_name("BW handshake"),
+                  &amok_bw_cb_bw_handshake);
+}
+void amok_bw_bw_leave() {
+  gras_cb_unregister(gras_msgtype_by_name("BW request"),
+                    &amok_bw_cb_bw_request);
+  gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
+                    &amok_bw_cb_bw_handshake);
+}
 
 /**
  * \brief bandwidth measurement between localhost and \e peer
@@ -218,7 +209,7 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   /* Build our answer */
   answer = xbt_new0(s_bw_request_t,1);
   
-  for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
+  for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
     TRY {
       measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
     } CATCH(e) {
@@ -236,8 +227,6 @@ int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t  ctx,
   answer->msg_size=request->msg_size;
   answer->host.port=gras_socket_my_port(measMasterIn);
 
-
-
   TRY {
     gras_msg_rpcreturn(60,ctx,&answer);
   } CATCH(e) { 
@@ -331,10 +320,11 @@ void amok_bw_request(const char* from_name,unsigned int from_port,
 
   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
        from_name,from_port, to_name,to_port,
-       *sec,*bw);
+       *sec,((double)*bw)/1024.0);
 
   gras_socket_close(sock);
   free(result);
+  free(request);
 }
 
 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
@@ -342,7 +332,7 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
                          
   /* specification of the test to run, and our answer */
   bw_request_t request = *(bw_request_t*)payload;
-  bw_res_t result = xbt_new0(s_bw_res,1);
+  bw_res_t result = xbt_new0(s_bw_res_t,1);
   gras_socket_t peer;
 
   peer = gras_socket_client(request->host.name,request->host.port);
@@ -360,47 +350,25 @@ int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
   return 1;
 }
 
-int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx,
-                        void             *payload) {
-   CRITICAL0("amok_bw_cb_sat_start; not implemented");
-   return 1;
-} 
-int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx,
-                        void             *payload) {
-   CRITICAL0("amok_bw_cb_sat_begin: not implemented");
-   return 1;
-}
-
 double * amok_bw_matrix(xbt_dynar_t hosts,
                          int buf_size_bw, int exp_size_bw, int msg_size_bw) { 
   double sec;
   /* construct of matrixs for bandwith and Latency */
 
 
-  double *matrix_bw;   /* matrix bandwidth */
   int i,j,len=xbt_dynar_length(hosts);
 
-  matrix_bw = (double *) malloc(sizeof(double)* len*len);
-
+  double *matrix_res = xbt_new0(double, len*len);
   xbt_host_t h1,h2;
 
-  h1 = xbt_new(s_xbt_host_t,1);
-  h2 = xbt_new(s_xbt_host_t,1);
-  i=0;
   xbt_dynar_foreach (hosts,i,h1) {
-    j=0;
     xbt_dynar_foreach (hosts,j,h2) {
-      if(i!=j) {
+      if (i!=j) {
         /* Mesurements of Bandwidth */
         amok_bw_request(h1->name,h1->port,h2->name,h2->port,
-                        buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_bw[i*len + j]);
-      } else {
-        matrix_bw[i*len +j] = 0.0;
-      }
-
+                        buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_res[i*len + j]);
+      } 
     }
   }
-  free(h1);
-  free(h2);
-  return matrix_bw;
+  return matrix_res;
 }
index 4eb673f..0fa86f0 100644 (file)
 #include "gras.h"
 #include "amok/bandwidth.h"
 
+void amok_bw_bw_init(void); /* Must be called only once per node */
+void amok_bw_bw_join(void); /* Each process must run it */
+void amok_bw_bw_leave(void);/* Each process must run it */
+
+void amok_bw_sat_init(void); /* Must be called only once per node */
+void amok_bw_sat_join(void); /* Each process must run it */
+void amok_bw_sat_leave(void);/* Each process must run it */
+
+/***
+ * Plain bandwidth measurement stuff
+ ***/
+
 /* Request for a BW experiment.
  * If host==NULL, it should be between the sender and the receiver.
  * If not, it should be between between the receiver and host (3-tiers).
@@ -27,29 +39,28 @@ typedef struct {
   unsigned long int msg_size;
 } s_bw_request_t,*bw_request_t;
 
-/* Result of a BW experiment (payload when answering).
- * if err.msg != NULL, it wasn't sucessful. Check err.msg and err.code to see why.
- */
+/* Result of a BW experiment (payload when answering). */
 typedef struct {
   unsigned int timestamp;
   double sec;
   double bw;
-} s_bw_res,*bw_res_t;
+} s_bw_res_t,*bw_res_t;
+
 
+/***
+ * Saturation stuff
+ ***/
 
 /* Description of a saturation experiment (payload asking some host to collaborate for that)
  */
 typedef struct {
   s_xbt_host_t host; /* host+raw socket to use */
   unsigned int msg_size;
-  unsigned int timeout;
+  unsigned int duration;
 } s_sat_request_t,*sat_request_t;
 
-/* Prototypes of local callbacks */
-int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
-int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
-
-int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload);
-int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload);
+void amok_bw_sat_start(const char* from_name,unsigned int from_port,
+                      const char* to_name,unsigned int to_port,
+                      unsigned int msg_size, unsigned int duration);
 
 #endif /* AMOK_BANDWIDTH_PRIVATE_H */
index aa632b0..4429bc4 100644 (file)
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include "amok/Bandwidth/bandwidth_private.h"
+#include "gras/Msg/msg_private.h" /* FIXME: This mucks with contextes to answer RPC directly */
 
-#if 0
 XBT_LOG_EXTERNAL_CATEGORY(bw);
 XBT_LOG_DEFAULT_CATEGORY(bw);
 
+static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload);
+static int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload);
+
+
+void amok_bw_sat_init(void) {
+  gras_datadesc_type_t sat_request_desc;
+  /* Build the saturation datatype descriptions */ 
+  
+  sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
+  gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("s_xbt_host_t"));
+  gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
+  gras_datadesc_struct_append(sat_request_desc,"duration",gras_datadesc_by_name("unsigned int"));
+  gras_datadesc_struct_close(sat_request_desc);
+  sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
+  
+  /* Register the saturation messages */
+  gras_msgtype_declare_rpc("amok_bw_sat start", sat_request_desc, NULL);
+  gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, NULL);
+  //  gras_msgtype_declare_rpc("amok_bw_sat end",   NULL,             NULL);
+  gras_msgtype_declare_rpc("amok_bw_sat stop",  NULL,             NULL);
+
+}
+void amok_bw_sat_join(void) {
+  gras_cb_register(gras_msgtype_by_name("amok_bw_sat start"),
+                  &amok_bw_cb_sat_start);
+  gras_cb_register(gras_msgtype_by_name("amok_bw_sat begin"),
+                  &amok_bw_cb_sat_begin);
+}
+void amok_bw_sat_leave(void) {
+  gras_cb_unregister(gras_msgtype_by_name("amok_bw_sat start"),
+                    &amok_bw_cb_sat_start);
+  gras_cb_unregister(gras_msgtype_by_name("amok_bw_sat begin"),
+                    &amok_bw_cb_sat_begin);
+}
+
 /* ***************************************************************************
  * Link saturation
  * ***************************************************************************/
 
-xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
-                                  const char* to_name,unsigned int to_port,
-                                  unsigned int msgSize, unsigned int timeout) {
-  gras_sock_t *sock;
-  xbt_error_t errcode;
-  /* The request */
-  SatExp_t *request;
-  msgHost_t *target;
-  /* answer */
-  gras_msg_t *answer;
+/**
+ * @brief Ask 'from_name:from_port' to stop saturating going to to_name:to_name.
+ *
+ * @from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
+ * @from_port: port on which the process we are asking for an experiment is listening
+ * (for message, do not give a raw socket here. The needed raw socket will be negociated 
+ * between the peers)
+ * @to_name: Name of the host with which we should conduct the experiment
+ * @to_port: port on which the peer process is listening for message
+ * @msg_size: Size of each message sent.
+ * @duration: How long in maximum should be the saturation.
+ *
+ * Ask the process 'from_name:from_port' to start to saturate the link between itself
+ * and to_name:to_name.
+ */
+void amok_bw_saturate_start(const char* from_name,unsigned int from_port,
+                           const char* to_name,unsigned int to_port,
+                           unsigned int msg_size, unsigned int duration) {
+  gras_socket_t sock;
+
+  sat_request_t request = xbt_new(s_sat_request_t,1);
+
+  sock = gras_socket_client(from_name,from_port);
+
+  request->host.name = (char*)to_name;
+  request->host.port = to_port;
+  
+  request->duration=duration;
+  request->msg_size=msg_size;
 
-  if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
-    fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
-           __FILE__,__LINE__,xbt_error_name(errcode));
-    return errcode;
+
+  gras_msg_rpccall(sock,60,gras_msgtype_by_name("amok_bw_sat start"),&request, NULL);
+
+  free(request);
+  gras_socket_close(sock);
+}
+
+/* Asked to begin a saturation */
+static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){
+  sat_request_t request = *(sat_request_t*)payload;
+  amok_bw_saturate_begin(request->host.name,request->host.port,
+                        request->msg_size, request->duration,
+                        NULL,NULL);
+  free(request->host.name);
+  free(request);
+  return 1;
+}
+
+/**
+ * @brief Start saturating between the current process and the designated peer
+ *
+ * Note that the only way to break this function before the end of the timeout
+ * is to have a remote host calling amok_bw_saturate_stop to this process.
+ */
+void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
+                           unsigned int msg_size, unsigned int duration,
+                           /*out*/ double *elapsed_res, double *bw_res) {
+  xbt_ex_t e;
+
+  gras_socket_t peer_cmd = gras_socket_client(to_name, to_port);
+  gras_socket_t measMaster=NULL,meas=NULL;
+  int port;
+
+  s_gras_msg_t msg_got;
+
+  unsigned int packet_sent=0;
+  double start,elapsed=-1; /* timer */
+  double bw;
+
+  volatile int saturate_further; /* boolean in the main loop */ 
+
+  /* Negociate the saturation with the peer */
+  sat_request_t request = xbt_new(s_sat_request_t,1);
+
+  DEBUG2("Begin to saturate to %s:%d",to_name,to_port);
+  memset(&msg_got,0,sizeof(msg_got));
+
+  for (port = 6000; port <= 10000 && measMaster == NULL; port++) {
+    TRY {
+      measMaster = gras_socket_server_ext(port,
+                                         0 /*bufsize: auto*/,
+                                         1 /*meas: true*/);
+    } CATCH(e) {
+      measMaster = NULL;
+      if (port < 10000)
+       xbt_ex_free(e);
+      else
+       RETHROW0("Error encountered while opening a measurement server socket: %s");
+    }
   }
-  if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
-      !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
-    fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
-    gras_sock_close(sock);
-    return malloc_error;    
+
+  request->msg_size = msg_size;
+  request->duration = duration;
+  request->host.name = NULL;
+  request->host.port = port;
+
+  gras_msg_cb_ctx_t ctx = gras_msg_rpc_async_call(peer_cmd, 60, 
+                                 gras_msgtype_by_name("amok_bw_sat begin"),
+                                                 &request);
+  gras_socket_close(peer_cmd);
+  INFO1("Async called %d",port);
+
+  TRY {
+    meas = gras_socket_meas_accept(measMaster);
+    DEBUG0("saturation handshake answered");
+  } CATCH(e) {
+    gras_socket_close(measMaster);
+    RETHROW0("Error during saturation handshake: %s");
+  }  
+  INFO0("Accepted");
+  gras_msg_rpc_async_wait(ctx,&request);
+
+  /* Start experiment */
+  start=gras_os_time();
+
+  do {
+    /* do send it */
+    gras_socket_meas_send(meas,120,msg_size,msg_size);
+    packet_sent++;
+
+    /* Check whether someone asked us to stop saturation */
+    saturate_further = 0;
+    TRY {
+      gras_msg_wait_ext(0/*no wait*/,gras_msgtype_by_name("amok_bw_sat stop"),
+                       NULL /* accept any sender */,
+                       NULL, NULL, /* No specific filter */
+                       &msg_got);
+    } CATCH(e) {
+      if (e.category == timeout_error) {
+       saturate_further=1;
+       memset(&msg_got,0,sizeof(msg_got)); /* may be overprotectiv here */
+      }
+      xbt_ex_free(e);
+    }
+
+    /* Check whether the experiment has to be terminated by now */
+    elapsed=gras_os_time()-start;
+
+  } while (saturate_further && elapsed < duration);
+
+  bw = ((double)(packet_sent*msg_size)) / elapsed;
+
+  if (elapsed_res)
+    *elapsed_res = elapsed;
+  if (bw_res)
+    *bw_res = bw;
+
+  if (elapsed >= duration) {
+    INFO2("Saturation experiment terminated. Took %f sec (achieving %f kb/s)",
+         elapsed, bw/1024.0);
   }
 
-  request->timeout=timeout;
-  request->msgSize=msgSize;
+  /* If someone stopped us, inform him about the achieved bandwidth */
+  if (msg_got.expe) {
+    bw_res_t answer = xbt_new(s_bw_res_t,1);
+    s_gras_msg_cb_ctx_t ctx;
 
-  strcpy(target->host,to_name);
-  target->port=to_port;
+    answer->timestamp=gras_os_time();
+    answer->sec=elapsed;
+    answer->bw=bw;
 
-  if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
-                             target,1,
-                             request,1))) {
-    fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
-           __FILE__,__LINE__,xbt_error_name(errcode));
-    gras_sock_close(sock);
-    return errcode;
-  }
-  if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
-    fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
-           __FILE__,__LINE__,xbt_error_name(errcode));
-    gras_sock_close(sock);
-    return errcode;
+    ctx.expeditor = msg_got.expe;
+    ctx.ID = msg_got.ID;
+    ctx.msgtype = msg_got.type;
+
+    gras_msg_rpcreturn(60,&ctx,&answer);
+    free(answer);
   }
+  
+  gras_socket_close(measMaster);
+  gras_socket_close(meas);
+}
 
-  if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
-    fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
-           __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
-    gras_msg_free(answer);
-    gras_sock_close(sock);
-    return errcode;
+/* Sender will saturate link to us */
+static int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload){
+  gras_socket_t from=gras_msg_cb_ctx_from(ctx);
+  sat_request_t request=*(sat_request_t*)payload;
+  volatile int saturate_further = 1;
+  xbt_ex_t e;
+
+  gras_os_sleep(5); /* Wait for the accept */
+  gras_socket_t sock=gras_socket_client_ext( gras_socket_peer_name(from),
+                                            request->host.port,
+                                            0 /*bufsize: auto*/,
+                                            1 /*meas: true*/);
+  gras_msg_rpcreturn(60, ctx, NULL);
+
+  while (saturate_further) {
+    TRY {
+      gras_socket_meas_send(sock,120,request->msg_size,request->msg_size);
+    } CATCH(e) {
+      saturate_further = 0;
+      xbt_ex_free(e);
+    }
   }
+  gras_socket_close(sock);
+  free(request);
+  return 1;
+}
 
-  gras_msg_free(answer);
-  gras_sock_close(sock);
-  return no_error;
+/**
+ * @brief Ask 'from_name:from_port' to stop any saturation experiments
+ * @from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
+ * @from_port: port on which the process we are asking for an experiment is listening
+ * @time: the duration of the experiment
+ * @bw: the achieved bandwidth
+ *
+ */
+void amok_bw_saturate_stop(const char* from_name,unsigned int from_port,
+                          /*out*/ unsigned int *time, unsigned int *bw) {
+
+  gras_socket_t sock = gras_socket_client(from_name,from_port);
+  gras_msg_rpccall(sock,60,gras_msgtype_by_name("amok_bw_sat stop"),NULL,NULL);
+  gras_socket_close(sock);
 }
 
+
+#if 0
 int grasbw_cbSatStart(gras_msg_t *msg) {
   gras_rawsock_t *raw;
   gras_sock_t *sock;
@@ -93,12 +290,6 @@ int grasbw_cbSatStart(gras_msg_t *msg) {
   /* answer */
   gras_msg_t *answer;
 
-  /*
-  fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
-  fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
-         msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
-         msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
-  */
 
   /* Negociate the saturation with the peer */
   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {