3 /* gras_bandwidth - GRAS mecanism to do Bandwidth tests between to hosts */
5 /* Authors: Martin Quinson */
6 /* Copyright (C) 2003 the OURAGAN project. */
8 /* This program is free software; you can redistribute it and/or modify it
9 under the terms of the license (GNU LGPL) which comes with this package. */
20 * Description of a BW experiment (payload when asking an host to do a BW experiment with us)
26 unsigned int port; /* raw socket to use */
29 static const DataDescriptor BwExp_Desc[] =
30 { SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,bufSize)),
31 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,expSize)),
32 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,msgSize)),
33 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(BwExp_t,port))};
39 * Description of a BW experiment (payload when asking an host to do a BW experiment with us)
44 unsigned int port; /* raw socket to use */
47 static const DataDescriptor SatExp_Desc[] =
48 { SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,msgSize)),
49 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,timeout)),
50 SIMPLE_MEMBER(UNSIGNED_INT_TYPE, 1, offsetof(SatExp_t,port))};
54 /* Prototypes of local callbacks */
55 int grasbw_cbBWHandshake(gras_msg_t *msg);
56 int grasbw_cbBWRequest(gras_msg_t *msg);
58 int grasbw_cbSatStart(gras_msg_t *msg);
59 int grasbw_cbSatBegin(gras_msg_t *msg);
62 gras_error_t grasbw_register_messages(void) {
67 (errcode=gras_msgtype_register(GRASMSG_BW_REQUEST,"BW request",2,
68 msgHostDesc,msgHostLen,
69 BwExp_Desc,BwExp_Len)) ||
70 (errcode=gras_msgtype_register(GRASMSG_BW_RESULT, "BW result",2,
71 msgErrorDesc,msgErrorLen,
72 msgResultDesc,msgResultLen /* first=seconds, second=bw */)) ||
74 (errcode=gras_msgtype_register(GRASMSG_BW_HANDSHAKE, "BW handshake",1,
75 BwExp_Desc,BwExp_Len)) ||
76 (errcode=gras_msgtype_register(GRASMSG_BW_HANDSHAKED, "BW handshake ACK",1,
77 BwExp_Desc,BwExp_Len)) ||
80 (errcode=gras_msgtype_register(GRASMSG_SAT_START,"SAT_START",2,
81 msgHostDesc,msgHostLen,
82 SatExp_Desc,SatExp_Len)) ||
83 (errcode=gras_msgtype_register(GRASMSG_SAT_STARTED, "SAT_STARTED",1,
84 msgErrorDesc,msgErrorLen)) ||
86 (errcode=gras_msgtype_register(GRASMSG_SAT_BEGIN,"SAT_BEGIN",1,
87 SatExp_Desc,SatExp_Len)) ||
88 (errcode=gras_msgtype_register(GRASMSG_SAT_BEGUN, "SAT_BEGUN",2,
89 msgErrorDesc,msgErrorLen,
90 SatExp_Desc,SatExp_Len)) ||
92 (errcode=gras_msgtype_register(GRASMSG_SAT_END,"SAT_END",0)) ||
93 (errcode=gras_msgtype_register(GRASMSG_SAT_ENDED, "SAT_ENDED",1,
94 msgErrorDesc,msgErrorLen)) ||
96 (errcode=gras_msgtype_register(GRASMSG_SAT_STOP,"SAT_STOP",0)) ||
97 (errcode=gras_msgtype_register(GRASMSG_SAT_STOPPED, "SAT_STOPPED",1,
98 msgErrorDesc,msgErrorLen)) )
101 fprintf(stderr,"GRASBW: Unable register the messages (got error %s)\n",
102 gras_error_name(errcode));
106 if ((errcode=gras_cb_register(GRASMSG_BW_HANDSHAKE,-1,&grasbw_cbBWHandshake)) ||
107 (errcode=gras_cb_register(GRASMSG_BW_REQUEST,-1,&grasbw_cbBWRequest)) ||
109 (errcode=gras_cb_register(GRASMSG_SAT_START,-1,&grasbw_cbSatStart)) ||
110 (errcode=gras_cb_register(GRASMSG_SAT_BEGIN,-1,&grasbw_cbSatBegin)) ) {
112 fprintf(stderr,"GRASBW: Unable register the callbacks (got error %s)\n",
113 gras_error_name(errcode));
120 /* ***************************************************************************
122 * ***************************************************************************/
123 /* Function to do a test from local to given host */
124 gras_error_t grasbw_test(const char*to_name,unsigned int to_port,
125 unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
126 /*OUT*/ double *sec, double *bw) {
127 gras_rawsock_t *rawIn,*rawOut;
129 gras_error_t errcode;
133 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
134 fprintf(stderr,"grasbw_test(): Error %s encountered while contacting peer\n",
135 gras_error_name(errcode));
138 if ((errcode=gras_rawsock_server_open(6666,8000,bufSize,&rawIn))) {
139 fprintf(stderr,"grasbw_test(): Error %s encountered while opening a raw socket\n",
140 gras_error_name(errcode));
144 if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t)))) {
145 fprintf(stderr,"grasbw_test(): Malloc error\n");
146 gras_sock_close(sock);
149 request->bufSize=bufSize;
150 request->expSize=expSize;
151 request->msgSize=msgSize;
152 request->port=gras_rawsock_get_peer_port(rawIn);
154 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_HANDSHAKE, 1,
156 fprintf(stderr,"grasbw_test(): Error %s encountered while sending the request.\n",
157 gras_error_name(errcode));
158 gras_sock_close(sock);
161 if ((errcode=gras_msg_wait(60,GRASMSG_BW_HANDSHAKED,&answer))) {
162 fprintf(stderr,"grasbw_test(): Error %s encountered while waiting for the answer.\n",
163 gras_error_name(errcode));
164 gras_sock_close(sock);
167 if((errcode=gras_rawsock_client_open(to_name,gras_msg_ctn(answer,0,0,BwExp_t).port,
169 fprintf(stderr,"grasbw_test(): Error %s encountered while opening the raw socket to %s:%d\n",
170 gras_error_name(errcode),to_name,gras_msg_ctn(answer,0,0,BwExp_t).port);
175 if ((errcode=gras_rawsock_send(rawOut,expSize,msgSize)) ||
176 (errcode=gras_rawsock_recv(rawIn,1,1,120))) {
177 fprintf(stderr,"grasbw_test(): Error %s encountered while sending the experiment.\n",
178 gras_error_name(errcode));
179 gras_rawsock_close(rawOut);
180 gras_rawsock_close(rawIn);
183 *sec = gras_time() - *sec;
184 *bw = ((double)expSize /* 8.0*/) / *sec / (1024.0 *1024.0);
186 gras_rawsock_close(rawIn);
187 gras_rawsock_close(rawOut);
188 gras_sock_close(sock);
189 gras_msg_free(answer);
193 /* Callback to the GRASMSG_BW_HANDSHAKE message:
194 opens a server raw socket,
195 indicate its port in an answer GRASMSG_BW_HANDSHAKED message,
196 receive the corresponding data on the raw socket,
199 int grasbw_cbBWHandshake(gras_msg_t *msg) {
200 gras_rawsock_t *rawIn,*rawOut;
202 gras_error_t errcode;
204 if ((errcode=gras_rawsock_server_open(6666,8000,gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawIn))) {
205 fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
206 gras_error_name(errcode));
209 if ((errcode=gras_rawsock_client_open(gras_sock_get_peer_name(msg->sock),gras_msg_ctn(msg,0,0,BwExp_t).port,
210 gras_msg_ctn(msg,0,0,BwExp_t).bufSize,&rawOut))) {
211 fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while opening a raw socket\n",
212 gras_error_name(errcode));
215 if (!(ans=(BwExp_t *)malloc(sizeof(BwExp_t)))) {
216 fprintf(stderr,"grasbw_cbHandshake(): Malloc error.\n");
217 gras_rawsock_close(rawIn);
218 gras_rawsock_close(rawOut);
221 ans->bufSize=gras_msg_ctn(msg,0,0,BwExp_t).bufSize;
222 ans->expSize=gras_msg_ctn(msg,0,0,BwExp_t).expSize;
223 ans->msgSize=gras_msg_ctn(msg,0,0,BwExp_t).msgSize;
224 ans->port=gras_rawsock_get_peer_port(rawIn);
225 // fprintf(stderr,"grasbw_cbHandshake. bufSize=%d expSize=%d msgSize=%d port=%d\n",
226 // ans->bufSize,ans->expSize,ans->msgSize,ans->port);
228 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_BW_HANDSHAKED, 1,
230 fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while sending the answer.\n",
231 gras_error_name(errcode));
232 gras_rawsock_close(rawIn);
233 gras_rawsock_close(rawOut);
237 if ((errcode=gras_rawsock_recv(rawIn,
238 gras_msg_ctn(msg,0,0,BwExp_t).expSize,
239 gras_msg_ctn(msg,0,0,BwExp_t).msgSize,
241 (errcode=gras_rawsock_send(rawOut,1,1))) {
242 fprintf(stderr,"grasbw_cbHandshake(): Error %s encountered while receiving the experiment.\n",
243 gras_error_name(errcode));
244 gras_rawsock_close(rawIn);
245 gras_rawsock_close(rawOut);
249 gras_rawsock_close(rawIn);
250 gras_rawsock_close(rawOut);
254 /* function to request a BW test between to external hosts */
255 gras_error_t grasbw_request(const char* from_name,unsigned int from_port,
256 const char* to_name,unsigned int to_port,
257 unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
258 /*OUT*/ double *sec, double*bw) {
262 gras_error_t errcode;
267 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
268 fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
269 gras_error_name(errcode));
272 if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
273 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
274 fprintf(stderr,"grasbw_test(): Malloc error\n");
275 gras_sock_close(sock);
279 request->bufSize=bufSize;
280 request->expSize=expSize;
281 request->msgSize=msgSize;
282 strcpy(target->host,to_name);
283 target->port=to_port;
285 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2,
288 fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
289 gras_error_name(errcode));
290 gras_sock_close(sock);
293 if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
294 fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
295 gras_error_name(errcode));
296 gras_sock_close(sock);
300 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
301 fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
302 gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
303 gras_msg_free(answer);
304 gras_sock_close(sock);
308 // fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t));
309 *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
310 *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
312 gras_msg_free(answer);
313 gras_sock_close(sock);
317 int grasbw_cbBWRequest(gras_msg_t *msg) {
318 /* specification of the test to run */
319 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
320 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
322 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
323 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
324 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
329 if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
330 !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
331 fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
335 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
336 &(res[0].value),&(res[1].value) ))) {
338 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
339 __FILE__,__LINE__,gras_error_name(error->errcode));
340 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
341 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
346 res[0].timestamp = (unsigned int) gras_time();
347 res[1].timestamp = (unsigned int) gras_time();
348 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
355 /* ***************************************************************************
357 * ***************************************************************************/
359 gras_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
360 const char* to_name,unsigned int to_port,
361 unsigned int msgSize, unsigned int timeout) {
363 gras_error_t errcode;
370 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
371 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
372 __FILE__,__LINE__,gras_error_name(errcode));
375 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
376 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
377 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
378 gras_sock_close(sock);
382 request->timeout=timeout;
383 request->msgSize=msgSize;
385 strcpy(target->host,to_name);
386 target->port=to_port;
388 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
391 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
392 __FILE__,__LINE__,gras_error_name(errcode));
393 gras_sock_close(sock);
396 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
397 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
398 __FILE__,__LINE__,gras_error_name(errcode));
399 gras_sock_close(sock);
403 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
404 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
405 __FILE__,__LINE__,gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
406 gras_msg_free(answer);
407 gras_sock_close(sock);
411 gras_msg_free(answer);
412 gras_sock_close(sock);
416 int grasbw_cbSatStart(gras_msg_t *msg) {
419 gras_error_t errcode;
420 double start; /* time to timeout */
422 /* specification of the test to run */
423 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
424 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
426 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
427 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
428 unsigned int raw_port;
436 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
437 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
438 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
439 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
442 /* Negociate the saturation with the peer */
443 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
444 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
445 gras_error_name(errcode));
446 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
447 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
448 errcode,"Cannot contact peer.\n");
451 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
452 fprintf(stderr,"cbSatStart(): Malloc error\n");
453 gras_sock_close(sock);
454 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
455 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
456 malloc_error,"Cannot build request.\n");
460 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
461 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
463 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
465 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
466 gras_error_name(errcode));
467 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
468 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
469 errcode,"Cannot send request.\n");
470 gras_sock_close(sock);
474 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
475 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
476 gras_error_name(errcode));
477 gras_sock_close(sock);
479 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
480 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
482 "Cannot receive the ACK.\n");
486 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
487 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
488 gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
490 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
491 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
493 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
494 gras_msg_free(answer);
495 gras_sock_close(sock);
499 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
501 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
502 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
503 gras_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
505 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
506 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
507 errcode,"Cannot open raw socket.\n");
508 gras_sock_close(sock);
512 /* send a train of data before repporting that XP is started */
513 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
514 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",gras_error_name(errcode));
515 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
516 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
517 errcode,"Cannot raw send.\n");
518 gras_sock_close(sock);
519 gras_rawsock_close(raw);
523 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
524 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
525 no_error,"Saturation started");
526 gras_msg_free(answer);
529 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
531 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
532 gras_time()-start < timeout) {
533 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
534 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",gras_error_name(errcode));
535 /* our error message do not interess anyone. SAT_STOP will do nothing. */
536 gras_sock_close(sock);
537 gras_rawsock_close(raw);
541 if (gras_time()-start > timeout) {
542 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
543 gras_sock_close(sock);
544 gras_rawsock_close(raw);
548 /* Handle the SAT_STOP which broke the previous while */
550 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
551 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
553 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
554 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
555 errcode,"Sending SAT_END to peer failed.\n");
556 gras_sock_close(sock);
557 gras_rawsock_close(raw);
561 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
562 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
564 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
565 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
566 errcode,"Receiving SAT_ENDED from peer failed.\n");
567 gras_sock_close(sock);
568 gras_rawsock_close(raw);
571 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
572 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
575 gras_sock_close(sock);
576 gras_rawsock_close(raw);
577 gras_msg_free(answer);
583 int grasbw_cbSatBegin(gras_msg_t *msg) {
585 gras_error_t errcode;
586 double start; /* timer */
588 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
589 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
594 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
595 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
596 fprintf(stderr,"cbSatBegin(): Malloc error\n");
597 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
598 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
599 malloc_error,"Malloc error");
603 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
604 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
605 gras_error_name(errcode));
606 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
607 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
608 errcode,"Cannot open raw socket");
611 request->port=gras_rawsock_get_peer_port(raw);
612 request->msgSize=msgSize;
613 error->errcode=no_error;
614 error->errmsg[0]='\0';
615 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
618 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
619 gras_error_name(errcode));
625 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
626 gras_time() - start < timeout) {
627 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
628 if (errcode != timeout_error && errcode != no_error) {
629 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",gras_error_name(errcode));
630 /* our error message do not interess anyone. SAT_END will do nothing. */
631 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
635 if (gras_time()-start > timeout) {
636 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
637 gras_rawsock_close(raw);
641 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
642 "cbSatBegin: Cannot send SAT_ENDED.\n",
644 gras_rawsock_close(raw);
649 gras_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
650 const char* to_name,unsigned int to_port) {
651 gras_error_t errcode;
655 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
656 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
657 gras_error_name(errcode));
661 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
662 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
663 gras_error_name(errcode));
664 gras_sock_close(sock);
668 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
669 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
670 gras_error_name(errcode));
671 gras_sock_close(sock);
675 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
676 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
677 gras_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
678 gras_msg_free(answer);
679 gras_sock_close(sock);
683 gras_msg_free(answer);
684 gras_sock_close(sock);