From 867f198b03e6e87da45c60a55e3dde0c88cf0926 Mon Sep 17 00:00:00 2001 From: mquinson Date: Tue, 4 Apr 2006 22:42:39 +0000 Subject: [PATCH] seems to work in most cases. Yuhu git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@2085 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/amok/Bandwidth/saturate.c | 98 ++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 43 deletions(-) diff --git a/src/amok/Bandwidth/saturate.c b/src/amok/Bandwidth/saturate.c index 4429bc4087..995a43e381 100644 --- a/src/amok/Bandwidth/saturate.c +++ b/src/amok/Bandwidth/saturate.c @@ -30,8 +30,7 @@ void amok_bw_sat_init(void) { /* Register the saturation messages */ gras_msgtype_declare_rpc("amok_bw_sat start", sat_request_desc, NULL); - gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, NULL); - // gras_msgtype_declare_rpc("amok_bw_sat end", NULL, NULL); + gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, sat_request_desc); gras_msgtype_declare_rpc("amok_bw_sat stop", NULL, NULL); } @@ -69,7 +68,7 @@ void amok_bw_sat_leave(void) { */ void amok_bw_saturate_start(const char* from_name,unsigned int from_port, const char* to_name,unsigned int to_port, - unsigned int msg_size, unsigned int duration) { + unsigned int msg_size, double duration) { gras_socket_t sock; sat_request_t request = xbt_new(s_sat_request_t,1); @@ -92,6 +91,7 @@ void amok_bw_saturate_start(const char* from_name,unsigned int from_port, /* Asked to begin a saturation */ static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){ sat_request_t request = *(sat_request_t*)payload; + gras_msg_rpcreturn(60,ctx, NULL); amok_bw_saturate_begin(request->host.name,request->host.port, request->msg_size, request->duration, NULL,NULL); @@ -107,14 +107,15 @@ static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){ * is to have a remote host calling amok_bw_saturate_stop to this process. */ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port, - unsigned int msg_size, unsigned int duration, + unsigned int msg_size, double duration, /*out*/ double *elapsed_res, double *bw_res) { xbt_ex_t e; gras_socket_t peer_cmd = gras_socket_client(to_name, to_port); - gras_socket_t measMaster=NULL,meas=NULL; - int port; + gras_msg_cb_ctx_t ctx; + + gras_socket_t meas; s_gras_msg_t msg_got; @@ -130,40 +131,23 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port, DEBUG2("Begin to saturate to %s:%d",to_name,to_port); memset(&msg_got,0,sizeof(msg_got)); - for (port = 6000; port <= 10000 && measMaster == NULL; port++) { - TRY { - measMaster = gras_socket_server_ext(port, - 0 /*bufsize: auto*/, - 1 /*meas: true*/); - } CATCH(e) { - measMaster = NULL; - if (port < 10000) - xbt_ex_free(e); - else - RETHROW0("Error encountered while opening a measurement server socket: %s"); - } - } - request->msg_size = msg_size; request->duration = duration; request->host.name = NULL; - request->host.port = port; + request->host.port = 0; - gras_msg_cb_ctx_t ctx = gras_msg_rpc_async_call(peer_cmd, 60, - gras_msgtype_by_name("amok_bw_sat begin"), + ctx = gras_msg_rpc_async_call(peer_cmd, 60, + gras_msgtype_by_name("amok_bw_sat begin"), &request); - gras_socket_close(peer_cmd); - INFO1("Async called %d",port); - - TRY { - meas = gras_socket_meas_accept(measMaster); - DEBUG0("saturation handshake answered"); - } CATCH(e) { - gras_socket_close(measMaster); - RETHROW0("Error during saturation handshake: %s"); - } - INFO0("Accepted"); + free(request); gras_msg_rpc_async_wait(ctx,&request); + meas=gras_socket_client_ext( to_name, request->host.port, + 0 /*bufsize: auto*/, + 1 /*meas: true*/); + free(request); + + gras_socket_close(peer_cmd); + INFO2("Saturation from %s to %s started",gras_os_myname(),to_name); /* Start experiment */ start=gras_os_time(); @@ -190,9 +174,11 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port, /* Check whether the experiment has to be terminated by now */ elapsed=gras_os_time()-start; + INFO2("elapsed %f duration %f",elapsed, duration); } while (saturate_further && elapsed < duration); + INFO2("Saturation from %s to %s stopped",gras_os_myname(),to_name); bw = ((double)(packet_sent*msg_size)) / elapsed; if (elapsed_res) @@ -222,33 +208,59 @@ void amok_bw_saturate_begin(const char* to_name,unsigned int to_port, free(answer); } - gras_socket_close(measMaster); gras_socket_close(meas); } /* Sender will saturate link to us */ static int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload){ - gras_socket_t from=gras_msg_cb_ctx_from(ctx); sat_request_t request=*(sat_request_t*)payload; + sat_request_t answer=xbt_new0(s_sat_request_t,1); volatile int saturate_further = 1; xbt_ex_t e; + gras_socket_t measMaster=NULL,meas=NULL; + + int port=6000; + while (port <= 10000 && measMaster == NULL) { + TRY { + measMaster = gras_socket_server_ext(port, + 0 /*bufsize: auto*/, + 1 /*meas: true*/); + } CATCH(e) { + measMaster = NULL; + if (port < 10000) + xbt_ex_free(e); + else + RETHROW0("Error encountered while opening a measurement server socket: %s"); + } + if (measMaster == NULL) + port++; /* prepare for a new loop */ + } + answer->host.port=port; + + gras_msg_rpcreturn(60, ctx, &answer); + free(answer); gras_os_sleep(5); /* Wait for the accept */ - gras_socket_t sock=gras_socket_client_ext( gras_socket_peer_name(from), - request->host.port, - 0 /*bufsize: auto*/, - 1 /*meas: true*/); - gras_msg_rpcreturn(60, ctx, NULL); + + TRY { + meas = gras_socket_meas_accept(measMaster); + DEBUG0("saturation handshake answered"); + } CATCH(e) { + gras_socket_close(measMaster); + RETHROW0("Error during saturation handshake: %s"); + } while (saturate_further) { TRY { - gras_socket_meas_send(sock,120,request->msg_size,request->msg_size); + gras_socket_meas_recv(meas,120,request->msg_size,request->msg_size); } CATCH(e) { saturate_further = 0; xbt_ex_free(e); } } - gras_socket_close(sock); + INFO1("Saturation stopped on %s",gras_os_myname()); + gras_socket_close(meas); + gras_socket_close(measMaster); free(request); return 1; } -- 2.20.1