/* Register the saturation messages */
gras_msgtype_declare_rpc("amok_bw_sat start", sat_request_desc, NULL);
- gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, NULL);
- // gras_msgtype_declare_rpc("amok_bw_sat end", NULL, NULL);
+ gras_msgtype_declare_rpc("amok_bw_sat begin", sat_request_desc, sat_request_desc);
gras_msgtype_declare_rpc("amok_bw_sat stop", NULL, NULL);
}
/**
* @brief Ask 'from_name:from_port' to stop saturating going to to_name:to_name.
*
- * @from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
- * @from_port: port on which the process we are asking for an experiment is listening
+ * @param from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
+ * @param from_port: port on which the process we are asking for an experiment is listening
* (for message, do not give a raw socket here. The needed raw socket will be negociated
* between the peers)
- * @to_name: Name of the host with which we should conduct the experiment
- * @to_port: port on which the peer process is listening for message
- * @msg_size: Size of each message sent.
- * @duration: How long in maximum should be the saturation.
+ * @param to_name: Name of the host with which we should conduct the experiment
+ * @param to_port: port on which the peer process is listening for message
+ * @param msg_size: Size of each message sent.
+ * @param duration: How long in maximum should be the saturation.
*
* Ask the process 'from_name:from_port' to start to saturate the link between itself
* and to_name:to_name.
*/
void amok_bw_saturate_start(const char* from_name,unsigned int from_port,
const char* to_name,unsigned int to_port,
- unsigned int msg_size, unsigned int duration) {
+ unsigned int msg_size, double duration) {
gras_socket_t sock;
sat_request_t request = xbt_new(s_sat_request_t,1);
/* Asked to begin a saturation */
static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){
sat_request_t request = *(sat_request_t*)payload;
+ gras_msg_rpcreturn(60,ctx, NULL);
amok_bw_saturate_begin(request->host.name,request->host.port,
request->msg_size, request->duration,
NULL,NULL);
* is to have a remote host calling amok_bw_saturate_stop to this process.
*/
void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
- unsigned int msg_size, unsigned int duration,
+ unsigned int msg_size, double duration,
/*out*/ double *elapsed_res, double *bw_res) {
xbt_ex_t e;
gras_socket_t peer_cmd = gras_socket_client(to_name, to_port);
- gras_socket_t measMaster=NULL,meas=NULL;
- int port;
+ gras_msg_cb_ctx_t ctx;
+
+ gras_socket_t meas;
s_gras_msg_t msg_got;
DEBUG2("Begin to saturate to %s:%d",to_name,to_port);
memset(&msg_got,0,sizeof(msg_got));
- for (port = 6000; port <= 10000 && measMaster == NULL; port++) {
- TRY {
- measMaster = gras_socket_server_ext(port,
- 0 /*bufsize: auto*/,
- 1 /*meas: true*/);
- } CATCH(e) {
- measMaster = NULL;
- if (port < 10000)
- xbt_ex_free(e);
- else
- RETHROW0("Error encountered while opening a measurement server socket: %s");
- }
- }
-
request->msg_size = msg_size;
request->duration = duration;
request->host.name = NULL;
- request->host.port = port;
+ request->host.port = 0;
- gras_msg_cb_ctx_t ctx = gras_msg_rpc_async_call(peer_cmd, 60,
- gras_msgtype_by_name("amok_bw_sat begin"),
+ ctx = gras_msg_rpc_async_call(peer_cmd, 60,
+ gras_msgtype_by_name("amok_bw_sat begin"),
&request);
- gras_socket_close(peer_cmd);
- INFO1("Async called %d",port);
-
- TRY {
- meas = gras_socket_meas_accept(measMaster);
- DEBUG0("saturation handshake answered");
- } CATCH(e) {
- gras_socket_close(measMaster);
- RETHROW0("Error during saturation handshake: %s");
- }
- INFO0("Accepted");
+ free(request);
gras_msg_rpc_async_wait(ctx,&request);
+ meas=gras_socket_client_ext( to_name, request->host.port,
+ 0 /*bufsize: auto*/,
+ 1 /*meas: true*/);
+ free(request);
+
+ gras_socket_close(peer_cmd);
+ INFO2("Saturation from %s to %s started",gras_os_myname(),to_name);
/* Start experiment */
start=gras_os_time();
saturate_further=1;
memset(&msg_got,0,sizeof(msg_got)); /* may be overprotectiv here */
}
- xbt_ex_free(e);
+ xbt_ex_free(&e);
}
/* Check whether the experiment has to be terminated by now */
elapsed=gras_os_time()-start;
+ VERB2("elapsed %f duration %f",elapsed, duration);
} while (saturate_further && elapsed < duration);
+ INFO2("Saturation from %s to %s stopped",gras_os_myname(),to_name);
bw = ((double)(packet_sent*msg_size)) / elapsed;
if (elapsed_res)
free(answer);
}
- gras_socket_close(measMaster);
gras_socket_close(meas);
}
/* Sender will saturate link to us */
static int amok_bw_cb_sat_begin(gras_msg_cb_ctx_t ctx, void *payload){
- gras_socket_t from=gras_msg_cb_ctx_from(ctx);
sat_request_t request=*(sat_request_t*)payload;
+ sat_request_t answer=xbt_new0(s_sat_request_t,1);
volatile int saturate_further = 1;
xbt_ex_t e;
+ gras_socket_t measMaster=NULL,meas=NULL;
+
+ int port=6000;
+ while (port <= 10000 && measMaster == NULL) {
+ TRY {
+ measMaster = gras_socket_server_ext(port,
+ 0 /*bufsize: auto*/,
+ 1 /*meas: true*/);
+ } CATCH(e) {
+ measMaster = NULL;
+ if (port < 10000)
+ xbt_ex_free(&e);
+ else
+ RETHROW0("Error encountered while opening a measurement server socket: %s");
+ }
+ if (measMaster == NULL)
+ port++; /* prepare for a new loop */
+ }
+ answer->host.port=port;
+
+ gras_msg_rpcreturn(60, ctx, &answer);
+ free(answer);
gras_os_sleep(5); /* Wait for the accept */
- gras_socket_t sock=gras_socket_client_ext( gras_socket_peer_name(from),
- request->host.port,
- 0 /*bufsize: auto*/,
- 1 /*meas: true*/);
- gras_msg_rpcreturn(60, ctx, NULL);
+
+ TRY {
+ meas = gras_socket_meas_accept(measMaster);
+ DEBUG0("saturation handshake answered");
+ } CATCH(e) {
+ gras_socket_close(measMaster);
+ RETHROW0("Error during saturation handshake: %s");
+ }
while (saturate_further) {
TRY {
- gras_socket_meas_send(sock,120,request->msg_size,request->msg_size);
+ gras_socket_meas_recv(meas,120,request->msg_size,request->msg_size);
} CATCH(e) {
saturate_further = 0;
- xbt_ex_free(e);
+ xbt_ex_free(&e);
}
}
- gras_socket_close(sock);
+ INFO1("Saturation stopped on %s",gras_os_myname());
+ gras_socket_close(meas);
+ if (gras_if_RL()) /* On SG, accepted=master */
+ gras_socket_close(measMaster);
free(request);
return 1;
}
/**
* @brief Ask 'from_name:from_port' to stop any saturation experiments
- * @from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
- * @from_port: port on which the process we are asking for an experiment is listening
- * @time: the duration of the experiment
- * @bw: the achieved bandwidth
+ * @param from_name: Name of the host we are asking to do a experiment with (to_name:to_port)
+ * @param from_port: port on which the process we are asking for an experiment is listening
+ * @param time: the duration of the experiment
+ * @param bw: the achieved bandwidth
*
*/
void amok_bw_saturate_stop(const char* from_name,unsigned int from_port,