3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15 static short _amok_bw_initialized = 0;
18 void amok_bw_init(void) {
19 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23 if (! _amok_bw_initialized) {
25 /* Build the datatype descriptions */
26 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27 gras_datadesc_struct_append(bw_request_desc,"host",
28 gras_datadesc_by_name("xbt_host_t"));
29 gras_datadesc_struct_append(bw_request_desc,"buf_size",
30 gras_datadesc_by_name("unsigned long int"));
31 gras_datadesc_struct_append(bw_request_desc,"exp_size",
32 gras_datadesc_by_name("unsigned long int"));
33 gras_datadesc_struct_append(bw_request_desc,"msg_size",
34 gras_datadesc_by_name("unsigned long int"));
35 gras_datadesc_struct_close(bw_request_desc);
36 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_close(bw_res_desc);
44 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_close(sat_request_desc);
51 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53 /* Register the bandwidth messages */
54 gras_msgtype_declare("BW request", bw_request_desc);
55 gras_msgtype_declare("BW result", bw_res_desc);
56 gras_msgtype_declare("BW handshake", bw_request_desc);
57 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
59 /* Register the saturation messages */
60 gras_msgtype_declare("SAT start", sat_request_desc);
61 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62 gras_msgtype_declare("SAT begin", sat_request_desc);
63 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
64 gras_msgtype_declare("SAT end", NULL);
65 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
66 gras_msgtype_declare("SAT stop", NULL);
67 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
70 /* Register the callbacks */
71 gras_cb_register(gras_msgtype_by_name("BW request"),
72 &amok_bw_cb_bw_request);
73 gras_cb_register(gras_msgtype_by_name("BW handshake"),
74 &amok_bw_cb_bw_handshake);
76 gras_cb_register(gras_msgtype_by_name("SAT start"),
77 &amok_bw_cb_sat_start);
78 gras_cb_register(gras_msgtype_by_name("SAT begin"),
79 &amok_bw_cb_sat_begin);
81 _amok_bw_initialized =1;
84 void amok_bw_exit(void) {
85 if (! _amok_bw_initialized)
88 gras_cb_unregister(gras_msgtype_by_name("BW request"),
89 &amok_bw_cb_bw_request);
90 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91 &amok_bw_cb_bw_handshake);
93 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94 &amok_bw_cb_sat_start);
95 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96 &amok_bw_cb_sat_begin);
98 _amok_bw_initialized = 0;
103 /* ***************************************************************************
105 * ***************************************************************************/
108 * \brief bandwidth measurement between localhost and @peer
110 * Results are reported in last args, and sizes are in kb.
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113 unsigned long int buf_size,
114 unsigned long int exp_size,
115 unsigned long int msg_size,
116 /*OUT*/ double *sec, double *bw) {
118 /* Measurement sockets for the experiments */
119 gras_socket_t measMasterIn,measIn,measOut;
120 gras_socket_t sock_dummy; /* ignored arg to msg_wait */
123 bw_request_t request,request_ack;
125 for (port = 5000, errcode = system_error;
126 errcode == system_error && port < 10000;
127 errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
128 if (errcode != no_error) {
129 ERROR1("Error %s encountered while opening a measurement socket",
130 xbt_error_name(errcode));
134 request=xbt_new0(s_bw_request_t,1);
135 request->buf_size=buf_size*1024;
136 request->exp_size=exp_size*1024;
137 request->msg_size=msg_size*1024;
138 request->host.name = NULL;
139 request->host.port = gras_socket_my_port(measMasterIn);
140 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
141 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
142 buf_size,request->buf_size);
144 if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
145 ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
148 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
150 if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
151 &sock_dummy,&request_ack))) {
152 ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
153 xbt_error_name(errcode));
157 /* FIXME: What if there is a remote error? */
159 if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
160 request_ack->host.port,
161 request->buf_size,1,&measOut))) {
162 ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
163 xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
166 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
169 TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
170 TRY(gras_socket_meas_recv(measIn,120,1,1));
173 ERROR1("Error %s encountered while sending the BW experiment.",
174 xbt_error_name(errcode));
175 gras_socket_close(measOut);
176 gras_socket_close(measMasterIn);
177 gras_socket_close(measIn);
180 *sec = gras_os_time() - *sec;
181 *bw = ((double)exp_size) / *sec;
185 if (measIn != measMasterIn)
186 gras_socket_close(measIn);
187 gras_socket_close(measMasterIn);
188 gras_socket_close(measOut);
193 /* Callback to the "BW handshake" message:
194 opens a server measurement socket,
195 indicate its port in an "BW handshaked" message,
196 receive the corresponding data on the measurement socket,
197 close the measurment socket
199 sizes are in byte (got converted from kb my expeditor)
201 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
203 gras_socket_t measMasterIn,measIn,measOut;
204 bw_request_t request=*(bw_request_t*)payload;
209 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
210 gras_socket_peer_name(expeditor),request->host.port,
211 request->buf_size,request->exp_size,request->msg_size);
213 /* Build our answer */
214 answer = xbt_new0(s_bw_request_t,1);
216 for (port = 6000, errcode = system_error;
217 errcode == system_error;
218 errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
219 if (errcode != no_error) {
220 ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
221 /* FIXME: tell error to remote */
225 answer->buf_size=request->buf_size;
226 answer->exp_size=request->exp_size;
227 answer->msg_size=request->msg_size;
228 answer->host.port=gras_socket_my_port(measMasterIn);
230 /* Don't connect asap to leave time to other side to enter the accept() */
231 if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
233 request->buf_size,1,&measOut))) {
234 ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d",
235 xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
236 /* FIXME: tell error to remote */
241 if ((errcode=gras_msg_send(expeditor,
242 gras_msgtype_by_name("BW handshake ACK"),
244 ERROR1("Error %s encountered while sending the answer.",
245 xbt_error_name(errcode));
246 gras_socket_close(measMasterIn);
247 gras_socket_close(measOut);
248 /* FIXME: tell error to remote */
251 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
252 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
253 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
255 TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
256 TRY(gras_socket_meas_send(measOut,120,1,1));
259 ERROR1("Error %s encountered while receiving the experiment.",
260 xbt_error_name(errcode));
261 gras_socket_close(measMasterIn);
262 gras_socket_close(measIn);
263 gras_socket_close(measOut);
264 * FIXME: tell error to remote ? *
268 if (measIn != measMasterIn)
269 gras_socket_close(measMasterIn);
270 gras_socket_close(measIn);
271 gras_socket_close(measOut);
274 DEBUG0("BW experiment done.");
278 int amok_bw_cb_bw_request(gras_socket_t expeditor,
280 CRITICAL0("Not implemented");
284 int amok_bw_cb_sat_start(gras_socket_t expeditor,
286 CRITICAL0("Not implemented");
289 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
291 CRITICAL0("Not implemented");
296 /* function to request a BW test between two external hosts */
297 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
298 const char* to_name,unsigned int to_port,
299 unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
300 /*OUT*/ double *sec, double*bw) {
309 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
310 fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
311 xbt_error_name(errcode));
314 if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
315 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
316 fprintf(stderr,"grasbw_test(): Malloc error\n");
317 gras_sock_close(sock);
321 request->bufSize=bufSize;
322 request->expSize=expSize;
323 request->msgSize=msgSize;
324 strcpy(target->host,to_name);
325 target->port=to_port;
327 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2,
330 fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
331 xbt_error_name(errcode));
332 gras_sock_close(sock);
335 if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
336 fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
337 xbt_error_name(errcode));
338 gras_sock_close(sock);
342 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
343 fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
344 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
345 gras_msg_free(answer);
346 gras_sock_close(sock);
350 /* fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
351 *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
352 *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
354 gras_msg_free(answer);
355 gras_sock_close(sock);
359 int grasbw_cbBWRequest(gras_msg_t *msg) {
360 /* specification of the test to run */
361 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
362 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
364 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
365 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
366 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
371 if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
372 !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
373 fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
377 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
378 &(res[0].value),&(res[1].value) ))) {
380 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
381 __FILE__,__LINE__,xbt_error_name(error->errcode));
382 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
383 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
388 res[0].timestamp = (unsigned int) gras_time();
389 res[1].timestamp = (unsigned int) gras_time();
390 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
397 /* ***************************************************************************
399 * ***************************************************************************/
401 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
402 const char* to_name,unsigned int to_port,
403 unsigned int msgSize, unsigned int timeout) {
412 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
413 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
414 __FILE__,__LINE__,xbt_error_name(errcode));
417 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
418 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
419 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
420 gras_sock_close(sock);
424 request->timeout=timeout;
425 request->msgSize=msgSize;
427 strcpy(target->host,to_name);
428 target->port=to_port;
430 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
433 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
434 __FILE__,__LINE__,xbt_error_name(errcode));
435 gras_sock_close(sock);
438 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
439 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
440 __FILE__,__LINE__,xbt_error_name(errcode));
441 gras_sock_close(sock);
445 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
446 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
447 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
448 gras_msg_free(answer);
449 gras_sock_close(sock);
453 gras_msg_free(answer);
454 gras_sock_close(sock);
458 int grasbw_cbSatStart(gras_msg_t *msg) {
462 double start; /* time to timeout */
464 /* specification of the test to run */
465 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
466 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
468 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
469 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
470 unsigned int raw_port;
478 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
479 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
480 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
481 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
484 /* Negociate the saturation with the peer */
485 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
486 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
487 xbt_error_name(errcode));
488 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
489 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
490 errcode,"Cannot contact peer.\n");
493 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
494 fprintf(stderr,"cbSatStart(): Malloc error\n");
495 gras_sock_close(sock);
496 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
497 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
498 malloc_error,"Cannot build request.\n");
502 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
503 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
505 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
507 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
508 xbt_error_name(errcode));
509 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
510 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
511 errcode,"Cannot send request.\n");
512 gras_sock_close(sock);
516 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
517 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
518 xbt_error_name(errcode));
519 gras_sock_close(sock);
521 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
522 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
524 "Cannot receive the ACK.\n");
528 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
529 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
530 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
532 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
533 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
535 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
536 gras_msg_free(answer);
537 gras_sock_close(sock);
541 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
543 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
544 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
545 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
547 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
548 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
549 errcode,"Cannot open raw socket.\n");
550 gras_sock_close(sock);
554 /* send a train of data before repporting that XP is started */
555 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
556 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
557 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
558 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
559 errcode,"Cannot raw send.\n");
560 gras_sock_close(sock);
561 gras_rawsock_close(raw);
565 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
566 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
567 no_error,"Saturation started");
568 gras_msg_free(answer);
571 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
573 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
574 gras_time()-start < timeout) {
575 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
576 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
577 /* our error message do not interess anyone. SAT_STOP will do nothing. */
578 gras_sock_close(sock);
579 gras_rawsock_close(raw);
583 if (gras_time()-start > timeout) {
584 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
585 gras_sock_close(sock);
586 gras_rawsock_close(raw);
590 /* Handle the SAT_STOP which broke the previous while */
592 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
593 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
595 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
596 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
597 errcode,"Sending SAT_END to peer failed.\n");
598 gras_sock_close(sock);
599 gras_rawsock_close(raw);
603 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
604 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
606 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
607 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
608 errcode,"Receiving SAT_ENDED from peer failed.\n");
609 gras_sock_close(sock);
610 gras_rawsock_close(raw);
613 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
614 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
617 gras_sock_close(sock);
618 gras_rawsock_close(raw);
619 gras_msg_free(answer);
625 int grasbw_cbSatBegin(gras_msg_t *msg) {
628 double start; /* timer */
630 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
631 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
636 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
637 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
638 fprintf(stderr,"cbSatBegin(): Malloc error\n");
639 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
640 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
641 malloc_error,"Malloc error");
645 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
646 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
647 xbt_error_name(errcode));
648 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
649 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
650 errcode,"Cannot open raw socket");
653 request->port=gras_rawsock_get_peer_port(raw);
654 request->msgSize=msgSize;
655 error->errcode=no_error;
656 error->errmsg[0]='\0';
657 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
660 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
661 xbt_error_name(errcode));
667 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
668 gras_time() - start < timeout) {
669 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
670 if (errcode != timeout_error && errcode != no_error) {
671 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
672 /* our error message do not interess anyone. SAT_END will do nothing. */
673 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
677 if (gras_time()-start > timeout) {
678 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
679 gras_rawsock_close(raw);
683 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
684 "cbSatBegin: Cannot send SAT_ENDED.\n",
686 gras_rawsock_close(raw);
691 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
692 const char* to_name,unsigned int to_port) {
697 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
698 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
699 xbt_error_name(errcode));
703 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
704 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
705 xbt_error_name(errcode));
706 gras_sock_close(sock);
710 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
711 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
712 xbt_error_name(errcode));
713 gras_sock_close(sock);
717 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
718 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
719 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
720 gras_msg_free(answer);
721 gras_sock_close(sock);
725 gras_msg_free(answer);
726 gras_sock_close(sock);