Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
seems to work in most cases. Yuhu
[simgrid.git] / src / amok / Bandwidth / saturate.c
index 4429bc4..995a43e 100644 (file)
@@ -30,8 +30,7 @@ void amok_bw_sat_init(void) {
   
   /* Register the saturation messages */
   gras_msgtype_declare_rpc("amok_bw_sat start", sat_request_desc, NULL);
-  gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, NULL);
-  //  gras_msgtype_declare_rpc("amok_bw_sat end",   NULL,             NULL);
+  gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, sat_request_desc);
   gras_msgtype_declare_rpc("amok_bw_sat stop",  NULL,             NULL);
 
 }
@@ -69,7 +68,7 @@ void amok_bw_sat_leave(void) {
  */
 void amok_bw_saturate_start(const char* from_name,unsigned int from_port,
                            const char* to_name,unsigned int to_port,
-                           unsigned int msg_size, unsigned int duration) {
+                           unsigned int msg_size, double duration) {
   gras_socket_t sock;
 
   sat_request_t request = xbt_new(s_sat_request_t,1);
@@ -92,6 +91,7 @@ void amok_bw_saturate_start(const char* from_name,unsigned int from_port,
 /* Asked to begin a saturation */
 static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){
   sat_request_t request = *(sat_request_t*)payload;
+  gras_msg_rpcreturn(60,ctx, NULL);
   amok_bw_saturate_begin(request->host.name,request->host.port,
                         request->msg_size, request->duration,
                         NULL,NULL);
@@ -107,14 +107,15 @@ static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){
  * is to have a remote host calling amok_bw_saturate_stop to this process.
  */
 void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
-                           unsigned int msg_size, unsigned int duration,
+                           unsigned int msg_size, double duration,
                            /*out*/ double *elapsed_res, double *bw_res) {
  
   xbt_ex_t e;
 
   gras_socket_t peer_cmd = gras_socket_client(to_name, to_port);
-  gras_socket_t measMaster=NULL,meas=NULL;
-  int port;
+  gras_msg_cb_ctx_t ctx;
+
+  gras_socket_t meas;
 
   s_gras_msg_t msg_got;
 
@@ -130,40 +131,23 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
   DEBUG2("Begin to saturate to %s:%d",to_name,to_port);
   memset(&msg_got,0,sizeof(msg_got));
 
-  for (port = 6000; port <= 10000 && measMaster == NULL; port++) {
-    TRY {
-      measMaster = gras_socket_server_ext(port,
-                                         0 /*bufsize: auto*/,
-                                         1 /*meas: true*/);
-    } CATCH(e) {
-      measMaster = NULL;
-      if (port < 10000)
-       xbt_ex_free(e);
-      else
-       RETHROW0("Error encountered while opening a measurement server socket: %s");
-    }
-  }
-
   request->msg_size = msg_size;
   request->duration = duration;
   request->host.name = NULL;
-  request->host.port = port;
+  request->host.port = 0;
 
-  gras_msg_cb_ctx_t ctx = gras_msg_rpc_async_call(peer_cmd, 60, 
-                                 gras_msgtype_by_name("amok_bw_sat begin"),
+  ctx = gras_msg_rpc_async_call(peer_cmd, 60, 
+                               gras_msgtype_by_name("amok_bw_sat begin"),
                                                  &request);
-  gras_socket_close(peer_cmd);
-  INFO1("Async called %d",port);
-
-  TRY {
-    meas = gras_socket_meas_accept(measMaster);
-    DEBUG0("saturation handshake answered");
-  } CATCH(e) {
-    gras_socket_close(measMaster);
-    RETHROW0("Error during saturation handshake: %s");
-  }  
-  INFO0("Accepted");
+  free(request);
   gras_msg_rpc_async_wait(ctx,&request);
+  meas=gras_socket_client_ext( to_name, request->host.port,
+                              0 /*bufsize: auto*/,
+                              1 /*meas: true*/);
+  free(request);
+
+  gras_socket_close(peer_cmd);
+  INFO2("Saturation from %s to %s started",gras_os_myname(),to_name);
 
   /* Start experiment */
   start=gras_os_time();
@@ -190,9 +174,11 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
 
     /* Check whether the experiment has to be terminated by now */
     elapsed=gras_os_time()-start;
+    INFO2("elapsed %f duration %f",elapsed, duration);
 
   } while (saturate_further && elapsed < duration);
 
+  INFO2("Saturation from %s to %s stopped",gras_os_myname(),to_name);
   bw = ((double)(packet_sent*msg_size)) / elapsed;
 
   if (elapsed_res)
@@ -222,33 +208,59 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
     free(answer);
   }
   
-  gras_socket_close(measMaster);
   gras_socket_close(meas);
 }
 
 /* Sender will saturate link to us */
 static int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload){
-  gras_socket_t from=gras_msg_cb_ctx_from(ctx);
   sat_request_t request=*(sat_request_t*)payload;
+  sat_request_t answer=xbt_new0(s_sat_request_t,1);
   volatile int saturate_further = 1;
   xbt_ex_t e;
+  gras_socket_t measMaster=NULL,meas=NULL;
+
+  int port=6000;
+  while (port <= 10000 && measMaster == NULL) {
+    TRY {
+      measMaster = gras_socket_server_ext(port,
+                                         0 /*bufsize: auto*/,
+                                         1 /*meas: true*/);
+    } CATCH(e) {
+      measMaster = NULL;
+      if (port < 10000)
+       xbt_ex_free(e);
+      else
+       RETHROW0("Error encountered while opening a measurement server socket: %s");
+    }
+    if (measMaster == NULL) 
+      port++; /* prepare for a new loop */
+  }
+  answer->host.port=port;
+
+  gras_msg_rpcreturn(60, ctx, &answer);
+  free(answer);
 
   gras_os_sleep(5); /* Wait for the accept */
-  gras_socket_t sock=gras_socket_client_ext( gras_socket_peer_name(from),
-                                            request->host.port,
-                                            0 /*bufsize: auto*/,
-                                            1 /*meas: true*/);
-  gras_msg_rpcreturn(60, ctx, NULL);
+
+  TRY {
+    meas = gras_socket_meas_accept(measMaster);
+    DEBUG0("saturation handshake answered");
+  } CATCH(e) {
+    gras_socket_close(measMaster);
+    RETHROW0("Error during saturation handshake: %s");
+  }  
 
   while (saturate_further) {
     TRY {
-      gras_socket_meas_send(sock,120,request->msg_size,request->msg_size);
+      gras_socket_meas_recv(meas,120,request->msg_size,request->msg_size);
     } CATCH(e) {
       saturate_further = 0;
       xbt_ex_free(e);
     }
   }
-  gras_socket_close(sock);
+  INFO1("Saturation stopped on %s",gras_os_myname());
+  gras_socket_close(meas);
+  gras_socket_close(measMaster);
   free(request);
   return 1;
 }