- /* Handle the SAT_STOP which broke the previous while */
-
- if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
- fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
-
- grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
- "cbSatStart: Severe error: Cannot send error status to requester!!\n",
- errcode,"Sending SAT_END to peer failed.\n");
- gras_sock_close(sock);
- gras_rawsock_close(raw);
- return 1;
- }
-
- if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
- fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
-
- grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
- "cbSatStart: Severe error: Cannot send error status to requester!!\n",
- errcode,"Receiving SAT_ENDED from peer failed.\n");
- gras_sock_close(sock);
- gras_rawsock_close(raw);
- return 1;
+ /* Launch the saturation */
+
+ ctx =
+ gras_msg_rpc_async_call(peer_cmd, 60, "amok_bw_sat begin", &request);
+ free(request);
+ gras_msg_rpc_async_wait(ctx, &request);
+ meas = gras_socket_client_ext(to_name, request->peer.port,
+ 0 /*bufsize: auto */ ,
+ 1 /*meas: true */ );
+
+ free(request);
+
+ gras_socket_close(peer_cmd);
+ XBT_INFO("Saturation(%s:%d->%s:%u) started", gras_os_myname(),
+ gras_os_myport(), to_name, to_port);
+
+ /* Start experiment */
+ start = gras_os_time();
+
+ do {
+ /* do send it */
+ xbt_socket_meas_send(meas, 120, msg_size, 1);
+ packet_sent++;
+
+ /* Check whether someone asked us to stop saturation */
+ saturate_further = 0;
+ TRY {
+ gras_msg_wait_ext(0 /*no wait */ , "amok_bw_sat stop",
+ NULL /* accept any sender */ ,
+ NULL, NULL, /* No specific filter */
+ &msg_got);
+ }
+ CATCH(e) {
+ if (e.category == timeout_error) {
+ saturate_further = 1;
+ memset(&msg_got, 0, sizeof(msg_got)); /* may be overprotectiv here */
+ }
+ xbt_ex_free(e);
+ }
+
+ /* Check whether the experiment has to be terminated by now */
+ elapsed = gras_os_time() - start;
+ XBT_DEBUG("elapsed %f duration %f (msg_size=%u)", elapsed, duration,
+ msg_size);
+
+ } while (saturate_further && (duration == 0 || elapsed < duration));
+
+ bw = ((double) (packet_sent * msg_size)) / elapsed;
+
+ if (elapsed_res)
+ *elapsed_res = elapsed;
+ if (bw_res)
+ *bw_res = bw;
+
+ /* If someone stopped us, inform him about the achieved bandwidth */
+ if (msg_got.expe) {
+ bw_res_t answer = xbt_new(s_bw_res_t, 1);
+ s_gras_msg_cb_ctx_t ctx;
+
+ XBT_INFO("Saturation from %s:%d to %s:%u stopped by %s:%d",
+ gras_os_myname(), gras_os_myport(), to_name, to_port,
+ xbt_socket_peer_name(msg_got.expe),
+ xbt_socket_peer_port(msg_got.expe));
+ answer->timestamp = gras_os_time();
+ answer->sec = elapsed;
+ answer->bw = bw;
+
+ ctx.expeditor = msg_got.expe;
+ ctx.ID = msg_got.ID;
+ ctx.msgtype = msg_got.type;
+
+ gras_msg_rpcreturn(60, &ctx, &answer);
+ free(answer);
+ } else {
+ XBT_INFO
+ ("Saturation from %s:%d to %s:%u elapsed after %f sec (achieving %f kb/s)",
+ gras_os_myname(), gras_os_myport(), to_name, to_port, elapsed,
+ bw / 1024.0);