3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15 static short _amok_bw_initialized = 0;
18 void amok_bw_init(void) {
19 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23 if (! _amok_bw_initialized) {
25 /* Build the datatype descriptions */
26 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27 gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
28 gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
29 gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
30 gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
31 gras_datadesc_struct_close(bw_request_desc);
32 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
34 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
35 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
36 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
37 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
38 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
39 gras_datadesc_struct_close(bw_res_desc);
40 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
42 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
43 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
44 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
45 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
46 gras_datadesc_struct_close(sat_request_desc);
47 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
49 /* Register the bandwidth messages */
50 gras_msgtype_declare("BW request", bw_request_desc);
51 gras_msgtype_declare("BW result", bw_res_desc);
52 gras_msgtype_declare("BW handshake", bw_request_desc);
53 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
55 /* Register the saturation messages */
56 gras_msgtype_declare("SAT start", sat_request_desc);
57 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
58 gras_msgtype_declare("SAT begin", sat_request_desc);
59 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
60 gras_msgtype_declare("SAT end", NULL);
61 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
62 gras_msgtype_declare("SAT stop", NULL);
63 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
66 /* Register the callbacks */
67 gras_cb_register(gras_msgtype_by_name("BW request"),
68 &amok_bw_cb_bw_request);
69 gras_cb_register(gras_msgtype_by_name("BW handshake"),
70 &amok_bw_cb_bw_handshake);
72 gras_cb_register(gras_msgtype_by_name("SAT start"),
73 &amok_bw_cb_sat_start);
74 gras_cb_register(gras_msgtype_by_name("SAT begin"),
75 &amok_bw_cb_sat_begin);
77 _amok_bw_initialized =1;
80 void amok_bw_exit(void) {
81 if (! _amok_bw_initialized)
84 gras_cb_unregister(gras_msgtype_by_name("BW request"),
85 &amok_bw_cb_bw_request);
86 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
87 &amok_bw_cb_bw_handshake);
89 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
90 &amok_bw_cb_sat_start);
91 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
92 &amok_bw_cb_sat_begin);
94 _amok_bw_initialized = 0;
99 /* ***************************************************************************
101 * ***************************************************************************/
106 * Conduct a test between the local host and @peer, and
107 * report the result in last args
109 xbt_error_t amok_bw_test(gras_socket_t peer,
110 unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
111 /*OUT*/ double *sec, double *bw) {
112 gras_socket_t measIn,measOut; /* Measurement sockets for the experiments */
113 gras_socket_t sock_dummy; /* ignored arg to msg_wait */
116 bw_request_t request,request_ack;
118 for (port = 5000, errcode = system_error;
119 errcode == system_error;
120 errcode = gras_socket_server_ext(++port,buf_size,1,&measIn));
121 if (errcode != no_error) {
122 ERROR1("Error %s encountered while opening a measurement socket",
123 xbt_error_name(errcode));
127 request=xbt_new0(s_bw_request_t,1);
128 request->buf_size=buf_size;
129 request->exp_size=exp_size;
130 request->msg_size=msg_size;
131 request->host.name = NULL;
132 request->host.port = gras_socket_my_port(measIn);
133 INFO3("Handshaking with %s:%d to connect it back on my %d",
134 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port);
136 if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
137 ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
140 if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
141 ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
142 xbt_error_name(errcode));
146 /* FIXME: What if there is a remote error? */
148 if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&measOut))) {
149 ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
150 xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
157 if ((errcode=gras_socket_meas_send(measOut,120,exp_size,msg_size)) ||
158 (errcode=gras_socket_meas_recv(measIn,120,1,1))) {
159 ERROR1("Error %s encountered while sending the BW experiment.",
160 xbt_error_name(errcode));
161 gras_socket_close(measOut);
162 gras_socket_close(measIn);
165 *sec = gras_os_time() - *sec;
166 *bw = ((double)exp_size /* 8.0*/) / *sec / (1024.0 *1024.0);
169 gras_socket_close(measIn);
170 gras_socket_close(measOut);
175 /* Callback to the "BW handshake" message:
176 opens a server measurement socket,
177 indicate its port in an "BW handshaked" message,
178 receive the corresponding data on the measurement socket,
179 close the measurment socket
181 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
183 gras_socket_t measIn,measOut;
184 bw_request_t request=*(bw_request_t*)payload;
189 INFO2("Handshaked to connect to %s:%d",
190 gras_socket_peer_name(expeditor),request->host.port);
192 answer = xbt_new0(s_bw_request_t,1);
194 for (port = 6000, errcode = system_error;
195 errcode == system_error;
196 errcode = gras_socket_server_ext(++port,request->buf_size,1,&measIn));
197 if (errcode != no_error) {
198 ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
199 /* FIXME: tell error to remote */
203 if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),request->host.port,
204 request->buf_size,1,&measOut))) {
205 ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d",
206 xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
207 /* FIXME: tell error to remote */
211 answer->buf_size=request->buf_size;
212 answer->exp_size=request->exp_size;
213 answer->msg_size=request->msg_size;
214 answer->host.port=gras_socket_my_port(measIn);
216 if ((errcode=gras_msg_send(expeditor,gras_msgtype_by_name("BW handshake ACK"),&answer))) {
217 ERROR1("Error %s encountered while sending the answer.",
218 xbt_error_name(errcode));
219 gras_socket_close(measIn);
220 gras_socket_close(measOut);
221 /* FIXME: tell error to remote */
224 INFO4("BW handshake answered. buf_size=%d exp_size=%d msg_size=%d port=%d",
225 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
227 if ((errcode=gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size)) ||
228 (errcode=gras_socket_meas_send(measOut,120,1,1))) {
229 ERROR1("Error %s encountered while receiving the experiment.",
230 xbt_error_name(errcode));
231 gras_socket_close(measIn);
232 gras_socket_close(measOut);
233 /* FIXME: tell error to remote ? */
236 gras_socket_close(measIn);
237 gras_socket_close(measOut);
241 int amok_bw_cb_bw_request(gras_socket_t expeditor,
243 CRITICAL0("Not implemented");
247 int amok_bw_cb_sat_start(gras_socket_t expeditor,
249 CRITICAL0("Not implemented");
252 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
254 CRITICAL0("Not implemented");
259 /* function to request a BW test between two external hosts */
260 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
261 const char* to_name,unsigned int to_port,
262 unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
263 /*OUT*/ double *sec, double*bw) {
272 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
273 fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
274 xbt_error_name(errcode));
277 if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
278 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
279 fprintf(stderr,"grasbw_test(): Malloc error\n");
280 gras_sock_close(sock);
284 request->bufSize=bufSize;
285 request->expSize=expSize;
286 request->msgSize=msgSize;
287 strcpy(target->host,to_name);
288 target->port=to_port;
290 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2,
293 fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
294 xbt_error_name(errcode));
295 gras_sock_close(sock);
298 if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
299 fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
300 xbt_error_name(errcode));
301 gras_sock_close(sock);
305 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
306 fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
307 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
308 gras_msg_free(answer);
309 gras_sock_close(sock);
313 /* fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
314 *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
315 *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
317 gras_msg_free(answer);
318 gras_sock_close(sock);
322 int grasbw_cbBWRequest(gras_msg_t *msg) {
323 /* specification of the test to run */
324 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
325 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
327 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
328 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
329 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
334 if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
335 !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
336 fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
340 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
341 &(res[0].value),&(res[1].value) ))) {
343 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
344 __FILE__,__LINE__,xbt_error_name(error->errcode));
345 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
346 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
351 res[0].timestamp = (unsigned int) gras_time();
352 res[1].timestamp = (unsigned int) gras_time();
353 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
360 /* ***************************************************************************
362 * ***************************************************************************/
364 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
365 const char* to_name,unsigned int to_port,
366 unsigned int msgSize, unsigned int timeout) {
375 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
376 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
377 __FILE__,__LINE__,xbt_error_name(errcode));
380 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
381 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
382 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
383 gras_sock_close(sock);
387 request->timeout=timeout;
388 request->msgSize=msgSize;
390 strcpy(target->host,to_name);
391 target->port=to_port;
393 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
396 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
397 __FILE__,__LINE__,xbt_error_name(errcode));
398 gras_sock_close(sock);
401 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
402 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
403 __FILE__,__LINE__,xbt_error_name(errcode));
404 gras_sock_close(sock);
408 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
409 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
410 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
411 gras_msg_free(answer);
412 gras_sock_close(sock);
416 gras_msg_free(answer);
417 gras_sock_close(sock);
421 int grasbw_cbSatStart(gras_msg_t *msg) {
425 double start; /* time to timeout */
427 /* specification of the test to run */
428 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
429 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
431 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
432 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
433 unsigned int raw_port;
441 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
442 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
443 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
444 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
447 /* Negociate the saturation with the peer */
448 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
449 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
450 xbt_error_name(errcode));
451 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
452 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
453 errcode,"Cannot contact peer.\n");
456 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
457 fprintf(stderr,"cbSatStart(): Malloc error\n");
458 gras_sock_close(sock);
459 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
460 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
461 malloc_error,"Cannot build request.\n");
465 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
466 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
468 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
470 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
471 xbt_error_name(errcode));
472 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
473 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
474 errcode,"Cannot send request.\n");
475 gras_sock_close(sock);
479 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
480 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
481 xbt_error_name(errcode));
482 gras_sock_close(sock);
484 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
485 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
487 "Cannot receive the ACK.\n");
491 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
492 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
493 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
495 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
496 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
498 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
499 gras_msg_free(answer);
500 gras_sock_close(sock);
504 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
506 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
507 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
508 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
510 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512 errcode,"Cannot open raw socket.\n");
513 gras_sock_close(sock);
517 /* send a train of data before repporting that XP is started */
518 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
519 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
520 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
521 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
522 errcode,"Cannot raw send.\n");
523 gras_sock_close(sock);
524 gras_rawsock_close(raw);
528 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
529 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
530 no_error,"Saturation started");
531 gras_msg_free(answer);
534 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
536 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
537 gras_time()-start < timeout) {
538 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
539 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
540 /* our error message do not interess anyone. SAT_STOP will do nothing. */
541 gras_sock_close(sock);
542 gras_rawsock_close(raw);
546 if (gras_time()-start > timeout) {
547 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
548 gras_sock_close(sock);
549 gras_rawsock_close(raw);
553 /* Handle the SAT_STOP which broke the previous while */
555 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
556 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
558 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
559 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
560 errcode,"Sending SAT_END to peer failed.\n");
561 gras_sock_close(sock);
562 gras_rawsock_close(raw);
566 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
567 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
569 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
570 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
571 errcode,"Receiving SAT_ENDED from peer failed.\n");
572 gras_sock_close(sock);
573 gras_rawsock_close(raw);
576 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
577 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
580 gras_sock_close(sock);
581 gras_rawsock_close(raw);
582 gras_msg_free(answer);
588 int grasbw_cbSatBegin(gras_msg_t *msg) {
591 double start; /* timer */
593 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
594 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
599 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
600 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
601 fprintf(stderr,"cbSatBegin(): Malloc error\n");
602 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
603 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
604 malloc_error,"Malloc error");
608 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
609 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
610 xbt_error_name(errcode));
611 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
612 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
613 errcode,"Cannot open raw socket");
616 request->port=gras_rawsock_get_peer_port(raw);
617 request->msgSize=msgSize;
618 error->errcode=no_error;
619 error->errmsg[0]='\0';
620 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
623 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
624 xbt_error_name(errcode));
630 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
631 gras_time() - start < timeout) {
632 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
633 if (errcode != timeout_error && errcode != no_error) {
634 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
635 /* our error message do not interess anyone. SAT_END will do nothing. */
636 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
640 if (gras_time()-start > timeout) {
641 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
642 gras_rawsock_close(raw);
646 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
647 "cbSatBegin: Cannot send SAT_ENDED.\n",
649 gras_rawsock_close(raw);
654 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
655 const char* to_name,unsigned int to_port) {
660 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
661 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
662 xbt_error_name(errcode));
666 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
667 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
668 xbt_error_name(errcode));
669 gras_sock_close(sock);
673 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
674 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
675 xbt_error_name(errcode));
676 gras_sock_close(sock);
680 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
681 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
682 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
683 gras_msg_free(answer);
684 gras_sock_close(sock);
688 gras_msg_free(answer);
689 gras_sock_close(sock);