3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15 static short _amok_bw_initialized = 0;
18 void amok_bw_init(void) {
19 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23 if (! _amok_bw_initialized) {
25 /* Build the datatype descriptions */
26 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27 gras_datadesc_struct_append(bw_request_desc,"host",
28 gras_datadesc_by_name("xbt_host_t"));
29 gras_datadesc_struct_append(bw_request_desc,"buf_size",
30 gras_datadesc_by_name("unsigned long int"));
31 gras_datadesc_struct_append(bw_request_desc,"exp_size",
32 gras_datadesc_by_name("unsigned long int"));
33 gras_datadesc_struct_append(bw_request_desc,"msg_size",
34 gras_datadesc_by_name("unsigned long int"));
35 gras_datadesc_struct_close(bw_request_desc);
36 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_close(bw_res_desc);
44 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_close(sat_request_desc);
51 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53 /* Register the bandwidth messages */
54 gras_msgtype_declare("BW request", bw_request_desc);
55 gras_msgtype_declare("BW result", bw_res_desc);
56 gras_msgtype_declare("BW handshake", bw_request_desc);
57 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
59 /* Register the saturation messages */
60 gras_msgtype_declare("SAT start", sat_request_desc);
61 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62 gras_msgtype_declare("SAT begin", sat_request_desc);
63 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
64 gras_msgtype_declare("SAT end", NULL);
65 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
66 gras_msgtype_declare("SAT stop", NULL);
67 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
70 /* Register the callbacks */
71 gras_cb_register(gras_msgtype_by_name("BW request"),
72 &amok_bw_cb_bw_request);
73 gras_cb_register(gras_msgtype_by_name("BW handshake"),
74 &amok_bw_cb_bw_handshake);
76 gras_cb_register(gras_msgtype_by_name("SAT start"),
77 &amok_bw_cb_sat_start);
78 gras_cb_register(gras_msgtype_by_name("SAT begin"),
79 &amok_bw_cb_sat_begin);
81 _amok_bw_initialized =1;
84 void amok_bw_exit(void) {
85 if (! _amok_bw_initialized)
88 gras_cb_unregister(gras_msgtype_by_name("BW request"),
89 &amok_bw_cb_bw_request);
90 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91 &amok_bw_cb_bw_handshake);
93 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94 &amok_bw_cb_sat_start);
95 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96 &amok_bw_cb_sat_begin);
98 _amok_bw_initialized = 0;
103 /* ***************************************************************************
105 * ***************************************************************************/
108 * \brief bandwidth measurement between localhost and @peer
110 * Results are reported in last args, and sizes are in kb.
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113 unsigned long int buf_size,
114 unsigned long int exp_size,
115 unsigned long int msg_size,
116 /*OUT*/ double *sec, double *bw) {
118 /* Measurement sockets for the experiments */
119 gras_socket_t measMasterIn,measIn,measOut;
122 bw_request_t request,request_ack;
124 for (port = 5000, errcode = system_error;
125 errcode == system_error && port < 10000;
126 errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
127 if (errcode != no_error) {
128 ERROR1("Error %s encountered while opening a measurement socket",
129 xbt_error_name(errcode));
133 request=xbt_new0(s_bw_request_t,1);
134 request->buf_size=buf_size*1024;
135 request->exp_size=exp_size*1024;
136 request->msg_size=msg_size*1024;
137 request->host.name = NULL;
138 request->host.port = gras_socket_my_port(measMasterIn);
139 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
140 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
141 buf_size,request->buf_size);
143 if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
144 ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
147 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
149 if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
150 NULL,&request_ack))) {
151 ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
152 xbt_error_name(errcode));
156 /* FIXME: What if there is a remote error? */
158 if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
159 request_ack->host.port,
160 request->buf_size,1,&measOut))) {
161 ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
162 xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
165 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
168 TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
169 TRY(gras_socket_meas_recv(measIn,120,1,1));
172 ERROR1("Error %s encountered while sending the BW experiment.",
173 xbt_error_name(errcode));
174 gras_socket_close(measOut);
175 gras_socket_close(measMasterIn);
176 gras_socket_close(measIn);
179 *sec = gras_os_time() - *sec;
180 *bw = ((double)exp_size) / *sec;
184 if (measIn != measMasterIn)
185 gras_socket_close(measIn);
186 gras_socket_close(measMasterIn);
187 gras_socket_close(measOut);
192 /* Callback to the "BW handshake" message:
193 opens a server measurement socket,
194 indicate its port in an "BW handshaked" message,
195 receive the corresponding data on the measurement socket,
196 close the measurment socket
198 sizes are in byte (got converted from kb my expeditor)
200 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
202 gras_socket_t measMasterIn,measIn,measOut;
203 bw_request_t request=*(bw_request_t*)payload;
208 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
209 gras_socket_peer_name(expeditor),request->host.port,
210 request->buf_size,request->exp_size,request->msg_size);
212 /* Build our answer */
213 answer = xbt_new0(s_bw_request_t,1);
215 for (port = 6000, errcode = system_error;
216 errcode == system_error;
217 errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
218 if (errcode != no_error) {
219 ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
220 /* FIXME: tell error to remote */
224 answer->buf_size=request->buf_size;
225 answer->exp_size=request->exp_size;
226 answer->msg_size=request->msg_size;
227 answer->host.port=gras_socket_my_port(measMasterIn);
229 /* Don't connect asap to leave time to other side to enter the accept() */
230 if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
232 request->buf_size,1,&measOut))) {
233 ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d",
234 xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
235 /* FIXME: tell error to remote */
240 if ((errcode=gras_msg_send(expeditor,
241 gras_msgtype_by_name("BW handshake ACK"),
243 ERROR1("Error %s encountered while sending the answer.",
244 xbt_error_name(errcode));
245 gras_socket_close(measMasterIn);
246 gras_socket_close(measOut);
247 /* FIXME: tell error to remote */
250 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
251 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
252 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
254 TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
255 TRY(gras_socket_meas_send(measOut,120,1,1));
258 ERROR1("Error %s encountered while receiving the experiment.",
259 xbt_error_name(errcode));
260 gras_socket_close(measMasterIn);
261 gras_socket_close(measIn);
262 gras_socket_close(measOut);
263 * FIXME: tell error to remote ? *
267 if (measIn != measMasterIn)
268 gras_socket_close(measMasterIn);
269 gras_socket_close(measIn);
270 gras_socket_close(measOut);
273 DEBUG0("BW experiment done.");
278 * \brief request a bandwidth measurement between two remote hosts
280 * Results are reported in last args, and sizes are in kb.
282 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
283 const char* to_name,unsigned int to_port,
284 unsigned long int buf_size,
285 unsigned long int exp_size,
286 unsigned long int msg_size,
287 /*OUT*/ double *sec, double*bw) {
292 bw_request_t request;
295 request=xbt_new0(s_bw_request_t,1);
296 request->buf_size=buf_size;
297 request->exp_size=exp_size;
298 request->msg_size=msg_size;
300 request->host.name = (char*)to_name;
301 request->host.port = to_port;
303 TRY(gras_socket_client(from_name,from_port,&sock));
304 TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
307 TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
312 VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
313 from_name,from_port, to_name,to_port,
316 gras_socket_close(sock);
320 int amok_bw_cb_bw_request(gras_socket_t expeditor,
324 /* specification of the test to run, and our answer */
325 bw_request_t request = *(bw_request_t*)payload;
326 bw_res_t result = xbt_new0(s_bw_res,1);
329 TRY(gras_socket_client(request->host.name,request->host.port,&peer));
330 TRY(amok_bw_test(peer,
331 request->buf_size,request->exp_size,request->msg_size,
332 &(result->sec),&(result->bw)));
333 gras_socket_close(peer);
339 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
340 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
342 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
343 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
344 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
349 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
350 &(res[0].value),&(res[1].value) ))) {
352 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
353 __FILE__,__LINE__,xbt_error_name(error->errcode));
354 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
355 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
360 res[0].timestamp = (unsigned int) gras_time();
361 res[1].timestamp = (unsigned int) gras_time();
362 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
370 int amok_bw_cb_sat_start(gras_socket_t expeditor,
372 CRITICAL0("amok_bw_cb_sat_start; not implemented");
375 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
377 CRITICAL0("amok_bw_cb_sat_begin: not implemented");
381 /* ***************************************************************************
383 * ***************************************************************************/
385 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
386 const char* to_name,unsigned int to_port,
387 unsigned int msgSize, unsigned int timeout) {
396 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
397 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
398 __FILE__,__LINE__,xbt_error_name(errcode));
401 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
402 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
403 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
404 gras_sock_close(sock);
408 request->timeout=timeout;
409 request->msgSize=msgSize;
411 strcpy(target->host,to_name);
412 target->port=to_port;
414 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
417 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
418 __FILE__,__LINE__,xbt_error_name(errcode));
419 gras_sock_close(sock);
422 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
423 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
424 __FILE__,__LINE__,xbt_error_name(errcode));
425 gras_sock_close(sock);
429 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
430 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
431 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
432 gras_msg_free(answer);
433 gras_sock_close(sock);
437 gras_msg_free(answer);
438 gras_sock_close(sock);
442 int grasbw_cbSatStart(gras_msg_t *msg) {
446 double start; /* time to timeout */
448 /* specification of the test to run */
449 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
450 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
452 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
453 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
454 unsigned int raw_port;
462 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
463 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
464 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
465 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
468 /* Negociate the saturation with the peer */
469 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
470 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
471 xbt_error_name(errcode));
472 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
473 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
474 errcode,"Cannot contact peer.\n");
477 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
478 fprintf(stderr,"cbSatStart(): Malloc error\n");
479 gras_sock_close(sock);
480 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
481 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
482 malloc_error,"Cannot build request.\n");
486 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
487 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
489 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
491 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
492 xbt_error_name(errcode));
493 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
494 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
495 errcode,"Cannot send request.\n");
496 gras_sock_close(sock);
500 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
501 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
502 xbt_error_name(errcode));
503 gras_sock_close(sock);
505 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
506 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
508 "Cannot receive the ACK.\n");
512 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
513 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
514 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
516 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
517 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
519 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
520 gras_msg_free(answer);
521 gras_sock_close(sock);
525 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
527 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
528 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
529 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
531 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
532 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
533 errcode,"Cannot open raw socket.\n");
534 gras_sock_close(sock);
538 /* send a train of data before repporting that XP is started */
539 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
540 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
541 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
542 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
543 errcode,"Cannot raw send.\n");
544 gras_sock_close(sock);
545 gras_rawsock_close(raw);
549 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
550 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
551 no_error,"Saturation started");
552 gras_msg_free(answer);
555 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
557 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
558 gras_time()-start < timeout) {
559 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
560 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
561 /* our error message do not interess anyone. SAT_STOP will do nothing. */
562 gras_sock_close(sock);
563 gras_rawsock_close(raw);
567 if (gras_time()-start > timeout) {
568 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
569 gras_sock_close(sock);
570 gras_rawsock_close(raw);
574 /* Handle the SAT_STOP which broke the previous while */
576 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
577 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
579 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
580 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
581 errcode,"Sending SAT_END to peer failed.\n");
582 gras_sock_close(sock);
583 gras_rawsock_close(raw);
587 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
588 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
590 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
591 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
592 errcode,"Receiving SAT_ENDED from peer failed.\n");
593 gras_sock_close(sock);
594 gras_rawsock_close(raw);
597 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
598 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
601 gras_sock_close(sock);
602 gras_rawsock_close(raw);
603 gras_msg_free(answer);
609 int grasbw_cbSatBegin(gras_msg_t *msg) {
612 double start; /* timer */
614 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
615 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
620 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
621 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
622 fprintf(stderr,"cbSatBegin(): Malloc error\n");
623 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
624 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
625 malloc_error,"Malloc error");
629 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
630 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
631 xbt_error_name(errcode));
632 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
633 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
634 errcode,"Cannot open raw socket");
637 request->port=gras_rawsock_get_peer_port(raw);
638 request->msgSize=msgSize;
639 error->errcode=no_error;
640 error->errmsg[0]='\0';
641 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
644 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
645 xbt_error_name(errcode));
651 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
652 gras_time() - start < timeout) {
653 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
654 if (errcode != timeout_error && errcode != no_error) {
655 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
656 /* our error message do not interess anyone. SAT_END will do nothing. */
657 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
661 if (gras_time()-start > timeout) {
662 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
663 gras_rawsock_close(raw);
667 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
668 "cbSatBegin: Cannot send SAT_ENDED.\n",
670 gras_rawsock_close(raw);
675 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
676 const char* to_name,unsigned int to_port) {
681 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
682 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
683 xbt_error_name(errcode));
687 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
688 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
689 xbt_error_name(errcode));
690 gras_sock_close(sock);
694 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
695 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
696 xbt_error_name(errcode));
697 gras_sock_close(sock);
701 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
702 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
703 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
704 gras_msg_free(answer);
705 gras_sock_close(sock);
709 gras_msg_free(answer);
710 gras_sock_close(sock);