+/* Asked to begin a saturation */
+static int amok_bw_cb_sat_start(gras_msg_cb_ctx_t ctx, void *payload){
+ sat_request_t request = *(sat_request_t*)payload;
+ gras_msg_rpcreturn(60,ctx, NULL);
+ amok_bw_saturate_begin(request->host.name,request->host.port,
+ request->msg_size, request->duration,
+ NULL,NULL);
+ free(request->host.name);
+ free(request);
+ return 1;
+}
+
+/**
+ * @brief Start saturating between the current process and the designated peer
+ *
+ * Note that the only way to break this function before the end of the timeout
+ * is to have a remote host calling amok_bw_saturate_stop to this process.
+ */
+void amok_bw_saturate_begin(const char* to_name,unsigned int to_port,
+ unsigned int msg_size, double duration,
+ /*out*/ double *elapsed_res, double *bw_res) {
+
+ xbt_ex_t e;
+
+ gras_socket_t peer_cmd = gras_socket_client(to_name, to_port);
+ gras_msg_cb_ctx_t ctx;
+
+ gras_socket_t meas;
+
+ s_gras_msg_t msg_got;
+
+ unsigned int packet_sent=0;
+ double start,elapsed=-1; /* timer */
+ double bw;
+
+ volatile int saturate_further; /* boolean in the main loop */
+
+ /* Negociate the saturation with the peer */
+ sat_request_t request = xbt_new(s_sat_request_t,1);
+
+ DEBUG2("Begin to saturate to %s:%d",to_name,to_port);
+ memset(&msg_got,0,sizeof(msg_got));
+
+ request->msg_size = msg_size;
+ request->duration = duration;
+ request->host.name = NULL;
+ request->host.port = 0;
+
+ ctx = gras_msg_rpc_async_call(peer_cmd, 60,
+ gras_msgtype_by_name("amok_bw_sat begin"),
+ &request);
+ free(request);
+ gras_msg_rpc_async_wait(ctx,&request);
+ meas=gras_socket_client_ext( to_name, request->host.port,
+ 0 /*bufsize: auto*/,
+ 1 /*meas: true*/);
+ free(request);
+
+ gras_socket_close(peer_cmd);
+ INFO2("Saturation from %s to %s started",gras_os_myname(),to_name);
+
+ /* Start experiment */
+ start=gras_os_time();
+
+ do {
+ /* do send it */
+ gras_socket_meas_send(meas,120,msg_size,msg_size);
+ packet_sent++;
+
+ /* Check whether someone asked us to stop saturation */
+ saturate_further = 0;
+ TRY {
+ gras_msg_wait_ext(0/*no wait*/,gras_msgtype_by_name("amok_bw_sat stop"),
+ NULL /* accept any sender */,
+ NULL, NULL, /* No specific filter */
+ &msg_got);
+ } CATCH(e) {
+ if (e.category == timeout_error) {
+ saturate_further=1;
+ memset(&msg_got,0,sizeof(msg_got)); /* may be overprotectiv here */
+ }
+ xbt_ex_free(&e);
+ }
+
+ /* Check whether the experiment has to be terminated by now */
+ elapsed=gras_os_time()-start;
+ VERB2("elapsed %f duration %f",elapsed, duration);
+
+ } while (saturate_further && elapsed < duration);
+
+ INFO2("Saturation from %s to %s stopped",gras_os_myname(),to_name);
+ bw = ((double)(packet_sent*msg_size)) / elapsed;
+
+ if (elapsed_res)
+ *elapsed_res = elapsed;
+ if (bw_res)
+ *bw_res = bw;
+
+ if (elapsed >= duration) {
+ INFO2("Saturation experiment terminated. Took %f sec (achieving %f kb/s)",
+ elapsed, bw/1024.0);