3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
16 static short _amok_bw_initialized = 0;
18 /** @brief module initialization; all participating nodes must run this */
19 void amok_bw_init(void) {
20 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
24 if (! _amok_bw_initialized) {
26 /* Build the datatype descriptions */
27 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28 gras_datadesc_struct_append(bw_request_desc,"host",
29 gras_datadesc_by_name("xbt_host_t"));
30 gras_datadesc_struct_append(bw_request_desc,"buf_size",
31 gras_datadesc_by_name("unsigned long int"));
32 gras_datadesc_struct_append(bw_request_desc,"exp_size",
33 gras_datadesc_by_name("unsigned long int"));
34 gras_datadesc_struct_append(bw_request_desc,"msg_size",
35 gras_datadesc_by_name("unsigned long int"));
36 gras_datadesc_struct_close(bw_request_desc);
37 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
39 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
40 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
41 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
42 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
44 gras_datadesc_struct_close(bw_res_desc);
45 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
47 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
48 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
49 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
51 gras_datadesc_struct_close(sat_request_desc);
52 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
54 /* Register the bandwidth messages */
55 gras_msgtype_declare("BW handshake", bw_request_desc);
56 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
57 gras_msgtype_declare("BW request", bw_request_desc);
58 gras_msgtype_declare("BW result", bw_res_desc);
60 /* Register the saturation messages */
61 gras_msgtype_declare("SAT start", sat_request_desc);
62 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
63 gras_msgtype_declare("SAT begin", sat_request_desc);
64 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
65 gras_msgtype_declare("SAT end", NULL);
66 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
67 gras_msgtype_declare("SAT stop", NULL);
68 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
71 /* Register the callbacks */
72 gras_cb_register(gras_msgtype_by_name("BW request"),
73 &amok_bw_cb_bw_request);
74 gras_cb_register(gras_msgtype_by_name("BW handshake"),
75 &amok_bw_cb_bw_handshake);
77 gras_cb_register(gras_msgtype_by_name("SAT start"),
78 &amok_bw_cb_sat_start);
79 gras_cb_register(gras_msgtype_by_name("SAT begin"),
80 &amok_bw_cb_sat_begin);
82 _amok_bw_initialized =1;
85 /** @brief module finalization */
86 void amok_bw_exit(void) {
87 if (! _amok_bw_initialized)
90 gras_cb_unregister(gras_msgtype_by_name("BW request"),
91 &amok_bw_cb_bw_request);
92 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
93 &amok_bw_cb_bw_handshake);
95 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
96 &amok_bw_cb_sat_start);
97 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
98 &amok_bw_cb_sat_begin);
100 _amok_bw_initialized = 0;
103 /* ***************************************************************************
105 * ***************************************************************************/
108 * \brief bandwidth measurement between localhost and \e peer
110 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
111 * \arg buf_size: Size of the socket buffer
112 * \arg exp_size: Total size of data sent across the network
113 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
114 * \arg sec: where the result (in seconds) should be stored.
115 * \arg bw: observed Bandwidth (in kb/s)
117 * Conduct a bandwidth test from the local process to the given peer.
118 * This call is blocking until the end of the experiment.
120 * Results are reported in last args, and sizes are in kb.
122 xbt_error_t amok_bw_test(gras_socket_t peer,
123 unsigned long int buf_size,
124 unsigned long int exp_size,
125 unsigned long int msg_size,
126 /*OUT*/ double *sec, double *bw) {
128 /* Measurement sockets for the experiments */
129 gras_socket_t measMasterIn=NULL,measIn,measOut;
132 bw_request_t request,request_ack;
135 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
137 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
143 RETHROW0("Error caught while opening a measurement socket: %s");
148 request=xbt_new0(s_bw_request_t,1);
149 request->buf_size=buf_size*1024;
150 request->exp_size=exp_size*1024;
151 request->msg_size=msg_size*1024;
152 request->host.name = NULL;
153 request->host.port = gras_socket_my_port(measMasterIn);
154 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
155 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
156 buf_size,request->buf_size);
159 gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request);
161 RETHROW0("Error encountered while sending the BW request: %s");
163 measIn = gras_socket_meas_accept(measMasterIn);
166 gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),NULL,&request_ack);
168 RETHROW0("Error encountered while waiting for the answer to BW request: %s");
171 /* FIXME: What if there is a remote error? */
174 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
175 request_ack->host.port,
176 request->buf_size,1);
178 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test",
179 gras_socket_peer_name(peer),request_ack->host.port);
181 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
184 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
185 gras_socket_meas_recv(measIn,120,1,1);
188 ERROR1("Error %s encountered while sending the BW experiment.",
189 xbt_error_name(errcode));
190 gras_socket_close(measOut);
191 gras_socket_close(measMasterIn);
192 gras_socket_close(measIn);
195 *sec = gras_os_time() - *sec;
196 *bw = ((double)exp_size) / *sec;
200 if (measIn != measMasterIn)
201 gras_socket_close(measIn);
202 gras_socket_close(measMasterIn);
203 gras_socket_close(measOut);
208 /* Callback to the "BW handshake" message:
209 opens a server measurement socket,
210 indicate its port in an "BW handshaked" message,
211 receive the corresponding data on the measurement socket,
212 close the measurment socket
214 sizes are in byte (got converted from kb my expeditor)
216 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
218 gras_socket_t measMasterIn=NULL,measIn,measOut;
219 bw_request_t request=*(bw_request_t*)payload;
224 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
225 gras_socket_peer_name(expeditor),request->host.port,
226 request->buf_size,request->exp_size,request->msg_size);
228 /* Build our answer */
229 answer = xbt_new0(s_bw_request_t,1);
231 for (port = 6000; port < 10000 && measMasterIn == NULL; port++) {
233 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
239 /* FIXME: tell error to remote */
240 RETHROW0("Error encountered while opening a measurement server socket: %s");
244 answer->buf_size=request->buf_size;
245 answer->exp_size=request->exp_size;
246 answer->msg_size=request->msg_size;
247 answer->host.port=gras_socket_my_port(measMasterIn);
249 /* Don't connect asap to leave time to other side to enter the accept() */
251 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
253 request->buf_size,1);
255 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
256 gras_socket_peer_name(expeditor),request->host.port);
257 /* FIXME: tell error to remote */
261 gras_msg_send(expeditor, gras_msgtype_by_name("BW handshake ACK"), &answer);
263 gras_socket_close(measMasterIn);
264 gras_socket_close(measOut);
265 /* FIXME: tell error to remote */
266 RETHROW0("Error encountered while sending the answer: %s");
270 measIn = gras_socket_meas_accept(measMasterIn);
271 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
272 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
274 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
275 gras_socket_meas_send(measOut,120,1,1);
277 gras_socket_close(measMasterIn);
278 gras_socket_close(measIn);
279 gras_socket_close(measOut);
280 /* FIXME: tell error to remote ? */
281 RETHROW0("Error encountered while receiving the experiment: %s");
284 if (measIn != measMasterIn)
285 gras_socket_close(measMasterIn);
286 gras_socket_close(measIn);
287 gras_socket_close(measOut);
290 DEBUG0("BW experiment done.");
295 * \brief request a bandwidth measurement between two remote hosts
297 * \arg from_name: Name of the first host
298 * \arg from_port: port on which the first process is listening for messages
299 * \arg to_name: Name of the second host
300 * \arg to_port: port on which the second process is listening (for messages, do not
301 * give a measurement socket here. The needed measurement sockets will be created
302 * automatically and negociated between the peers)
303 * \arg buf_size: Size of the socket buffer
304 * \arg exp_size: Total size of data sent across the network
305 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
306 * \arg sec: where the result (in seconds) should be stored.
307 * \arg bw: observed Bandwidth (in kb/s)
309 * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
310 * This call is blocking until the end of the experiment.
312 * Results are reported in last args, and sizes are in kb.
314 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
315 const char* to_name,unsigned int to_port,
316 unsigned long int buf_size,
317 unsigned long int exp_size,
318 unsigned long int msg_size,
319 /*OUT*/ double *sec, double*bw) {
323 bw_request_t request;
326 request=xbt_new0(s_bw_request_t,1);
327 request->buf_size=buf_size;
328 request->exp_size=exp_size;
329 request->msg_size=msg_size;
331 request->host.name = (char*)to_name;
332 request->host.port = to_port;
334 sock = gras_socket_client(from_name,from_port);
335 gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request);
338 gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result);
343 VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
344 from_name,from_port, to_name,to_port,
347 gras_socket_close(sock);
352 int amok_bw_cb_bw_request(gras_socket_t expeditor,
355 /* specification of the test to run, and our answer */
356 bw_request_t request = *(bw_request_t*)payload;
357 bw_res_t result = xbt_new0(s_bw_res,1);
360 peer = gras_socket_client(request->host.name,request->host.port);
362 request->buf_size,request->exp_size,request->msg_size,
363 &(result->sec),&(result->bw));
365 gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result);
368 gras_socket_close(peer);
375 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
376 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
378 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
379 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
380 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
385 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
386 &(res[0].value),&(res[1].value) ))) {
388 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
389 __FILE__,__LINE__,xbt_error_name(error->errcode));
390 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
391 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
396 res[0].timestamp = (unsigned int) gras_time();
397 res[1].timestamp = (unsigned int) gras_time();
398 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
406 int amok_bw_cb_sat_start(gras_socket_t expeditor,
408 CRITICAL0("amok_bw_cb_sat_start; not implemented");
411 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
413 CRITICAL0("amok_bw_cb_sat_begin: not implemented");
417 /* ***************************************************************************
419 * ***************************************************************************/
421 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
422 const char* to_name,unsigned int to_port,
423 unsigned int msgSize, unsigned int timeout) {
432 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
433 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
434 __FILE__,__LINE__,xbt_error_name(errcode));
437 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
438 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
439 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
440 gras_sock_close(sock);
444 request->timeout=timeout;
445 request->msgSize=msgSize;
447 strcpy(target->host,to_name);
448 target->port=to_port;
450 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
453 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
454 __FILE__,__LINE__,xbt_error_name(errcode));
455 gras_sock_close(sock);
458 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
459 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
460 __FILE__,__LINE__,xbt_error_name(errcode));
461 gras_sock_close(sock);
465 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
466 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
467 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
468 gras_msg_free(answer);
469 gras_sock_close(sock);
473 gras_msg_free(answer);
474 gras_sock_close(sock);
478 int grasbw_cbSatStart(gras_msg_t *msg) {
482 double start; /* time to timeout */
484 /* specification of the test to run */
485 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
486 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
488 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
489 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
490 unsigned int raw_port;
498 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
499 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
500 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
501 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
504 /* Negociate the saturation with the peer */
505 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
506 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
507 xbt_error_name(errcode));
508 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
509 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
510 errcode,"Cannot contact peer.\n");
513 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
514 fprintf(stderr,"cbSatStart(): Malloc error\n");
515 gras_sock_close(sock);
516 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
517 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
518 malloc_error,"Cannot build request.\n");
522 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
523 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
525 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
527 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
528 xbt_error_name(errcode));
529 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
530 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
531 errcode,"Cannot send request.\n");
532 gras_sock_close(sock);
536 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
537 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
538 xbt_error_name(errcode));
539 gras_sock_close(sock);
541 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
542 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
544 "Cannot receive the ACK.\n");
548 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
549 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
550 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
552 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
553 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
555 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
556 gras_msg_free(answer);
557 gras_sock_close(sock);
561 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
563 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
564 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
565 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
567 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
568 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
569 errcode,"Cannot open raw socket.\n");
570 gras_sock_close(sock);
574 /* send a train of data before repporting that XP is started */
575 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
576 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
577 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
578 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
579 errcode,"Cannot raw send.\n");
580 gras_sock_close(sock);
581 gras_rawsock_close(raw);
585 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
586 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
587 no_error,"Saturation started");
588 gras_msg_free(answer);
591 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
593 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
594 gras_time()-start < timeout) {
595 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
596 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
597 /* our error message do not interess anyone. SAT_STOP will do nothing. */
598 gras_sock_close(sock);
599 gras_rawsock_close(raw);
603 if (gras_time()-start > timeout) {
604 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
605 gras_sock_close(sock);
606 gras_rawsock_close(raw);
610 /* Handle the SAT_STOP which broke the previous while */
612 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
613 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
615 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
616 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
617 errcode,"Sending SAT_END to peer failed.\n");
618 gras_sock_close(sock);
619 gras_rawsock_close(raw);
623 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
624 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
626 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
627 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
628 errcode,"Receiving SAT_ENDED from peer failed.\n");
629 gras_sock_close(sock);
630 gras_rawsock_close(raw);
633 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
634 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
637 gras_sock_close(sock);
638 gras_rawsock_close(raw);
639 gras_msg_free(answer);
645 int grasbw_cbSatBegin(gras_msg_t *msg) {
648 double start; /* timer */
650 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
651 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
656 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
657 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
658 fprintf(stderr,"cbSatBegin(): Malloc error\n");
659 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
660 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
661 malloc_error,"Malloc error");
665 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
666 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
667 xbt_error_name(errcode));
668 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
669 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
670 errcode,"Cannot open raw socket");
673 request->port=gras_rawsock_get_peer_port(raw);
674 request->msgSize=msgSize;
675 error->errcode=no_error;
676 error->errmsg[0]='\0';
677 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
680 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
681 xbt_error_name(errcode));
687 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
688 gras_time() - start < timeout) {
689 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
690 if (errcode != timeout_error && errcode != no_error) {
691 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
692 /* our error message do not interess anyone. SAT_END will do nothing. */
693 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
697 if (gras_time()-start > timeout) {
698 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
699 gras_rawsock_close(raw);
703 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
704 "cbSatBegin: Cannot send SAT_ENDED.\n",
706 gras_rawsock_close(raw);
711 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
712 const char* to_name,unsigned int to_port) {
717 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
718 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
719 xbt_error_name(errcode));
723 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
724 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
725 xbt_error_name(errcode));
726 gras_sock_close(sock);
730 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
731 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
732 xbt_error_name(errcode));
733 gras_sock_close(sock);
737 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
738 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
739 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
740 gras_msg_free(answer);
741 gras_sock_close(sock);
745 gras_msg_free(answer);
746 gras_sock_close(sock);