3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15 static short _amok_bw_initialized = 0;
18 void amok_bw_init(void) {
19 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23 if (! _amok_bw_initialized) {
25 /* Build the datatype descriptions */
26 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27 gras_datadesc_struct_append(bw_request_desc,"host",
28 gras_datadesc_by_name("xbt_host_t"));
29 gras_datadesc_struct_append(bw_request_desc,"buf_size",
30 gras_datadesc_by_name("unsigned long int"));
31 gras_datadesc_struct_append(bw_request_desc,"exp_size",
32 gras_datadesc_by_name("unsigned long int"));
33 gras_datadesc_struct_append(bw_request_desc,"msg_size",
34 gras_datadesc_by_name("unsigned long int"));
35 gras_datadesc_struct_close(bw_request_desc);
36 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_close(bw_res_desc);
44 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_close(sat_request_desc);
51 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53 /* Register the bandwidth messages */
54 gras_msgtype_declare("BW request", bw_request_desc);
55 gras_msgtype_declare("BW result", bw_res_desc);
56 gras_msgtype_declare("BW handshake", bw_request_desc);
57 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
59 /* Register the saturation messages */
60 gras_msgtype_declare("SAT start", sat_request_desc);
61 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62 gras_msgtype_declare("SAT begin", sat_request_desc);
63 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
64 gras_msgtype_declare("SAT end", NULL);
65 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
66 gras_msgtype_declare("SAT stop", NULL);
67 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
70 /* Register the callbacks */
71 gras_cb_register(gras_msgtype_by_name("BW request"),
72 &amok_bw_cb_bw_request);
73 gras_cb_register(gras_msgtype_by_name("BW handshake"),
74 &amok_bw_cb_bw_handshake);
76 gras_cb_register(gras_msgtype_by_name("SAT start"),
77 &amok_bw_cb_sat_start);
78 gras_cb_register(gras_msgtype_by_name("SAT begin"),
79 &amok_bw_cb_sat_begin);
81 _amok_bw_initialized =1;
84 void amok_bw_exit(void) {
85 if (! _amok_bw_initialized)
88 gras_cb_unregister(gras_msgtype_by_name("BW request"),
89 &amok_bw_cb_bw_request);
90 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91 &amok_bw_cb_bw_handshake);
93 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94 &amok_bw_cb_sat_start);
95 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96 &amok_bw_cb_sat_begin);
98 _amok_bw_initialized = 0;
103 /* ***************************************************************************
105 * ***************************************************************************/
108 * \brief bandwidth measurement between localhost and @peer
110 * Results are reported in last args, and sizes are in kb.
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113 unsigned long int buf_size,
114 unsigned long int exp_size,
115 unsigned long int msg_size,
116 /*OUT*/ double *sec, double *bw) {
118 /* Measurement sockets for the experiments */
119 gras_socket_t measMasterIn,measIn,measOut;
122 bw_request_t request,request_ack;
124 for (port = 5000, errcode = system_error;
125 errcode == system_error && port < 10000;
126 errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
127 if (errcode != no_error) {
128 ERROR1("Error %s encountered while opening a measurement socket",
129 xbt_error_name(errcode));
133 request=xbt_new0(s_bw_request_t,1);
134 request->buf_size=buf_size*1024;
135 request->exp_size=exp_size*1024;
136 request->msg_size=msg_size*1024;
137 request->host.name = NULL;
138 request->host.port = gras_socket_my_port(measMasterIn);
139 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
140 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
141 buf_size,request->buf_size);
143 if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
144 ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
147 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
149 if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
150 NULL,&request_ack))) {
151 ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
152 xbt_error_name(errcode));
156 /* FIXME: What if there is a remote error? */
158 if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
159 request_ack->host.port,
160 request->buf_size,1,&measOut))) {
161 ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
162 xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
165 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
168 TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
169 TRY(gras_socket_meas_recv(measIn,120,1,1));
172 ERROR1("Error %s encountered while sending the BW experiment.",
173 xbt_error_name(errcode));
174 gras_socket_close(measOut);
175 gras_socket_close(measMasterIn);
176 gras_socket_close(measIn);
179 *sec = gras_os_time() - *sec;
180 *bw = ((double)exp_size) / *sec;
184 if (measIn != measMasterIn)
185 gras_socket_close(measIn);
186 gras_socket_close(measMasterIn);
187 gras_socket_close(measOut);
192 /* Callback to the "BW handshake" message:
193 opens a server measurement socket,
194 indicate its port in an "BW handshaked" message,
195 receive the corresponding data on the measurement socket,
196 close the measurment socket
198 sizes are in byte (got converted from kb my expeditor)
200 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
202 gras_socket_t measMasterIn,measIn,measOut;
203 bw_request_t request=*(bw_request_t*)payload;
208 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
209 gras_socket_peer_name(expeditor),request->host.port,
210 request->buf_size,request->exp_size,request->msg_size);
212 /* Build our answer */
213 answer = xbt_new0(s_bw_request_t,1);
215 for (port = 6000, errcode = system_error;
216 errcode == system_error;
217 errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
218 if (errcode != no_error) {
219 ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
220 /* FIXME: tell error to remote */
224 answer->buf_size=request->buf_size;
225 answer->exp_size=request->exp_size;
226 answer->msg_size=request->msg_size;
227 answer->host.port=gras_socket_my_port(measMasterIn);
229 /* Don't connect asap to leave time to other side to enter the accept() */
230 if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
232 request->buf_size,1,&measOut))) {
233 ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d",
234 xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
235 /* FIXME: tell error to remote */
240 if ((errcode=gras_msg_send(expeditor,
241 gras_msgtype_by_name("BW handshake ACK"),
243 ERROR1("Error %s encountered while sending the answer.",
244 xbt_error_name(errcode));
245 gras_socket_close(measMasterIn);
246 gras_socket_close(measOut);
247 /* FIXME: tell error to remote */
250 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
251 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
252 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
254 TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
255 TRY(gras_socket_meas_send(measOut,120,1,1));
258 ERROR1("Error %s encountered while receiving the experiment.",
259 xbt_error_name(errcode));
260 gras_socket_close(measMasterIn);
261 gras_socket_close(measIn);
262 gras_socket_close(measOut);
263 * FIXME: tell error to remote ? *
267 if (measIn != measMasterIn)
268 gras_socket_close(measMasterIn);
269 gras_socket_close(measIn);
270 gras_socket_close(measOut);
273 DEBUG0("BW experiment done.");
278 * \brief request a bandwidth measurement between two remote hosts
280 * Results are reported in last args, and sizes are in kb.
282 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
283 const char* to_name,unsigned int to_port,
284 unsigned long int buf_size,
285 unsigned long int exp_size,
286 unsigned long int msg_size,
287 /*OUT*/ double *sec, double*bw) {
292 bw_request_t request;
295 request=xbt_new0(s_bw_request_t,1);
296 request->buf_size=buf_size;
297 request->exp_size=exp_size;
298 request->msg_size=msg_size;
300 request->host.name = (char*)to_name;
301 request->host.port = to_port;
303 TRY(gras_socket_client(from_name,from_port,&sock));
304 TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
307 TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
309 *sec=result->seconds;
312 VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
313 from_name,from_port, to_name,to_port,
316 gras_socket_close(sock);
320 int amok_bw_cb_bw_request(gras_socket_t expeditor,
322 /* specification of the test to run, and our answer */
323 bw_request_t request = *(bw_request_t*)payload;
324 bw_res_t result = xbt_new0(s_bw_res_t,1);
326 TRY(gras_socket_client(request->to_name,request->to_port,&peer));
327 TRY(amok_bw_test(peer,
328 request->buf_size,request->exp_size,request->msg_size,
329 &(result->sec),&(result->bw)));
330 gras_socket_close(peer);
334 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
335 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
337 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
338 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
339 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
344 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
345 &(res[0].value),&(res[1].value) ))) {
347 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
348 __FILE__,__LINE__,xbt_error_name(error->errcode));
349 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
350 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
355 res[0].timestamp = (unsigned int) gras_time();
356 res[1].timestamp = (unsigned int) gras_time();
357 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
364 int amok_bw_cb_sat_start(gras_socket_t expeditor,
366 CRITICAL0("amok_bw_cb_sat_start; not implemented");
369 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
371 CRITICAL0("amok_bw_cb_sat_begin: not implemented");
375 /* ***************************************************************************
377 * ***************************************************************************/
379 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
380 const char* to_name,unsigned int to_port,
381 unsigned int msgSize, unsigned int timeout) {
390 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
391 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
392 __FILE__,__LINE__,xbt_error_name(errcode));
395 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
396 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
397 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
398 gras_sock_close(sock);
402 request->timeout=timeout;
403 request->msgSize=msgSize;
405 strcpy(target->host,to_name);
406 target->port=to_port;
408 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
411 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
412 __FILE__,__LINE__,xbt_error_name(errcode));
413 gras_sock_close(sock);
416 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
417 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
418 __FILE__,__LINE__,xbt_error_name(errcode));
419 gras_sock_close(sock);
423 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
424 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
425 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
426 gras_msg_free(answer);
427 gras_sock_close(sock);
431 gras_msg_free(answer);
432 gras_sock_close(sock);
436 int grasbw_cbSatStart(gras_msg_t *msg) {
440 double start; /* time to timeout */
442 /* specification of the test to run */
443 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
444 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
446 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
447 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
448 unsigned int raw_port;
456 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
457 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
458 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
459 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
462 /* Negociate the saturation with the peer */
463 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
464 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
465 xbt_error_name(errcode));
466 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
467 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
468 errcode,"Cannot contact peer.\n");
471 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
472 fprintf(stderr,"cbSatStart(): Malloc error\n");
473 gras_sock_close(sock);
474 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
475 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
476 malloc_error,"Cannot build request.\n");
480 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
481 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
483 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
485 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
486 xbt_error_name(errcode));
487 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
488 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
489 errcode,"Cannot send request.\n");
490 gras_sock_close(sock);
494 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
495 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
496 xbt_error_name(errcode));
497 gras_sock_close(sock);
499 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
500 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
502 "Cannot receive the ACK.\n");
506 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
507 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
508 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
510 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
513 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
514 gras_msg_free(answer);
515 gras_sock_close(sock);
519 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
521 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
522 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
523 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
525 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
526 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
527 errcode,"Cannot open raw socket.\n");
528 gras_sock_close(sock);
532 /* send a train of data before repporting that XP is started */
533 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
534 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
535 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
536 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
537 errcode,"Cannot raw send.\n");
538 gras_sock_close(sock);
539 gras_rawsock_close(raw);
543 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
544 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
545 no_error,"Saturation started");
546 gras_msg_free(answer);
549 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
551 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
552 gras_time()-start < timeout) {
553 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
554 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
555 /* our error message do not interess anyone. SAT_STOP will do nothing. */
556 gras_sock_close(sock);
557 gras_rawsock_close(raw);
561 if (gras_time()-start > timeout) {
562 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
563 gras_sock_close(sock);
564 gras_rawsock_close(raw);
568 /* Handle the SAT_STOP which broke the previous while */
570 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
571 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
573 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
574 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
575 errcode,"Sending SAT_END to peer failed.\n");
576 gras_sock_close(sock);
577 gras_rawsock_close(raw);
581 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
582 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
584 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
585 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
586 errcode,"Receiving SAT_ENDED from peer failed.\n");
587 gras_sock_close(sock);
588 gras_rawsock_close(raw);
591 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
592 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
595 gras_sock_close(sock);
596 gras_rawsock_close(raw);
597 gras_msg_free(answer);
603 int grasbw_cbSatBegin(gras_msg_t *msg) {
606 double start; /* timer */
608 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
609 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
614 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
615 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
616 fprintf(stderr,"cbSatBegin(): Malloc error\n");
617 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
618 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
619 malloc_error,"Malloc error");
623 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
624 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
625 xbt_error_name(errcode));
626 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
627 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
628 errcode,"Cannot open raw socket");
631 request->port=gras_rawsock_get_peer_port(raw);
632 request->msgSize=msgSize;
633 error->errcode=no_error;
634 error->errmsg[0]='\0';
635 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
638 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
639 xbt_error_name(errcode));
645 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
646 gras_time() - start < timeout) {
647 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
648 if (errcode != timeout_error && errcode != no_error) {
649 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
650 /* our error message do not interess anyone. SAT_END will do nothing. */
651 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
655 if (gras_time()-start > timeout) {
656 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
657 gras_rawsock_close(raw);
661 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
662 "cbSatBegin: Cannot send SAT_ENDED.\n",
664 gras_rawsock_close(raw);
669 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
670 const char* to_name,unsigned int to_port) {
675 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
676 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
677 xbt_error_name(errcode));
681 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
682 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
683 xbt_error_name(errcode));
684 gras_sock_close(sock);
688 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
689 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
690 xbt_error_name(errcode));
691 gras_sock_close(sock);
695 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
696 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
697 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
698 gras_msg_free(answer);
699 gras_sock_close(sock);
703 gras_msg_free(answer);
704 gras_sock_close(sock);