3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15 static short _amok_bw_initialized = 0;
17 /** @brief module initialization; all participating nodes must run this */
18 void amok_bw_init(void) {
19 gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
23 if (! _amok_bw_initialized) {
25 /* Build the datatype descriptions */
26 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27 gras_datadesc_struct_append(bw_request_desc,"host",
28 gras_datadesc_by_name("xbt_host_t"));
29 gras_datadesc_struct_append(bw_request_desc,"buf_size",
30 gras_datadesc_by_name("unsigned long int"));
31 gras_datadesc_struct_append(bw_request_desc,"exp_size",
32 gras_datadesc_by_name("unsigned long int"));
33 gras_datadesc_struct_append(bw_request_desc,"msg_size",
34 gras_datadesc_by_name("unsigned long int"));
35 gras_datadesc_struct_close(bw_request_desc);
36 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
38 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39 gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43 gras_datadesc_struct_close(bw_res_desc);
44 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
46 sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47 gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48 gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49 gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50 gras_datadesc_struct_close(sat_request_desc);
51 sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
53 /* Register the bandwidth messages */
54 gras_msgtype_declare("BW handshake", bw_request_desc);
55 gras_msgtype_declare("BW handshake ACK", bw_request_desc);
56 gras_msgtype_declare("BW request", bw_request_desc);
57 gras_msgtype_declare("BW result", bw_res_desc);
59 /* Register the saturation messages */
60 gras_msgtype_declare("SAT start", sat_request_desc);
61 gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62 gras_msgtype_declare("SAT begin", sat_request_desc);
63 gras_msgtype_declare("SAT begun", gras_datadesc_by_name("amok_remoterr_t"));
64 gras_msgtype_declare("SAT end", NULL);
65 gras_msgtype_declare("SAT ended", gras_datadesc_by_name("amok_remoterr_t"));
66 gras_msgtype_declare("SAT stop", NULL);
67 gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
70 /* Register the callbacks */
71 gras_cb_register(gras_msgtype_by_name("BW request"),
72 &amok_bw_cb_bw_request);
73 gras_cb_register(gras_msgtype_by_name("BW handshake"),
74 &amok_bw_cb_bw_handshake);
76 gras_cb_register(gras_msgtype_by_name("SAT start"),
77 &amok_bw_cb_sat_start);
78 gras_cb_register(gras_msgtype_by_name("SAT begin"),
79 &amok_bw_cb_sat_begin);
81 _amok_bw_initialized =1;
84 /** @brief module finalization */
85 void amok_bw_exit(void) {
86 if (! _amok_bw_initialized)
89 gras_cb_unregister(gras_msgtype_by_name("BW request"),
90 &amok_bw_cb_bw_request);
91 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
92 &amok_bw_cb_bw_handshake);
94 gras_cb_unregister(gras_msgtype_by_name("SAT start"),
95 &amok_bw_cb_sat_start);
96 gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
97 &amok_bw_cb_sat_begin);
99 _amok_bw_initialized = 0;
102 /* ***************************************************************************
104 * ***************************************************************************/
107 * \brief bandwidth measurement between localhost and \e peer
109 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
110 * \arg buf_size: Size of the socket buffer
111 * \arg exp_size: Total size of data sent across the network
112 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
113 * \arg sec: where the result (in seconds) should be stored.
114 * \arg bw: observed Bandwidth (in kb/s)
116 * Conduct a bandwidth test from the local process to the given peer.
117 * This call is blocking until the end of the experiment.
119 * Results are reported in last args, and sizes are in kb.
121 xbt_error_t amok_bw_test(gras_socket_t peer,
122 unsigned long int buf_size,
123 unsigned long int exp_size,
124 unsigned long int msg_size,
125 /*OUT*/ double *sec, double *bw) {
127 /* Measurement sockets for the experiments */
128 gras_socket_t measMasterIn,measIn,measOut;
131 bw_request_t request,request_ack;
133 for (port = 5000, errcode = system_error;
134 errcode == system_error && port < 10000;
135 errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
136 if (errcode != no_error) {
137 ERROR1("Error %s encountered while opening a measurement socket",
138 xbt_error_name(errcode));
142 request=xbt_new0(s_bw_request_t,1);
143 request->buf_size=buf_size*1024;
144 request->exp_size=exp_size*1024;
145 request->msg_size=msg_size*1024;
146 request->host.name = NULL;
147 request->host.port = gras_socket_my_port(measMasterIn);
148 VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)",
149 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
150 buf_size,request->buf_size);
152 if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
153 ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
156 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
158 if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
159 NULL,&request_ack))) {
160 ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
161 xbt_error_name(errcode));
165 /* FIXME: What if there is a remote error? */
167 if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
168 request_ack->host.port,
169 request->buf_size,1,&measOut))) {
170 ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
171 xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
174 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
177 TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
178 TRY(gras_socket_meas_recv(measIn,120,1,1));
181 ERROR1("Error %s encountered while sending the BW experiment.",
182 xbt_error_name(errcode));
183 gras_socket_close(measOut);
184 gras_socket_close(measMasterIn);
185 gras_socket_close(measIn);
188 *sec = gras_os_time() - *sec;
189 *bw = ((double)exp_size) / *sec;
193 if (measIn != measMasterIn)
194 gras_socket_close(measIn);
195 gras_socket_close(measMasterIn);
196 gras_socket_close(measOut);
201 /* Callback to the "BW handshake" message:
202 opens a server measurement socket,
203 indicate its port in an "BW handshaked" message,
204 receive the corresponding data on the measurement socket,
205 close the measurment socket
207 sizes are in byte (got converted from kb my expeditor)
209 int amok_bw_cb_bw_handshake(gras_socket_t expeditor,
211 gras_socket_t measMasterIn,measIn,measOut;
212 bw_request_t request=*(bw_request_t*)payload;
217 VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
218 gras_socket_peer_name(expeditor),request->host.port,
219 request->buf_size,request->exp_size,request->msg_size);
221 /* Build our answer */
222 answer = xbt_new0(s_bw_request_t,1);
224 for (port = 6000, errcode = system_error;
225 errcode == system_error;
226 errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
227 if (errcode != no_error) {
228 ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
229 /* FIXME: tell error to remote */
233 answer->buf_size=request->buf_size;
234 answer->exp_size=request->exp_size;
235 answer->msg_size=request->msg_size;
236 answer->host.port=gras_socket_my_port(measMasterIn);
238 /* Don't connect asap to leave time to other side to enter the accept() */
239 if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
241 request->buf_size,1,&measOut))) {
242 ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d",
243 xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
244 /* FIXME: tell error to remote */
249 if ((errcode=gras_msg_send(expeditor,
250 gras_msgtype_by_name("BW handshake ACK"),
252 ERROR1("Error %s encountered while sending the answer.",
253 xbt_error_name(errcode));
254 gras_socket_close(measMasterIn);
255 gras_socket_close(measOut);
256 /* FIXME: tell error to remote */
259 TRY(gras_socket_meas_accept(measMasterIn,&measIn));
260 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
261 answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
263 TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
264 TRY(gras_socket_meas_send(measOut,120,1,1));
267 ERROR1("Error %s encountered while receiving the experiment.",
268 xbt_error_name(errcode));
269 gras_socket_close(measMasterIn);
270 gras_socket_close(measIn);
271 gras_socket_close(measOut);
272 * FIXME: tell error to remote ? *
276 if (measIn != measMasterIn)
277 gras_socket_close(measMasterIn);
278 gras_socket_close(measIn);
279 gras_socket_close(measOut);
282 DEBUG0("BW experiment done.");
287 * \brief request a bandwidth measurement between two remote hosts
289 * \arg from_name: Name of the first host
290 * \arg from_port: port on which the first process is listening for messages
291 * \arg to_name: Name of the second host
292 * \arg to_port: port on which the second process is listening (for messages, do not
293 * give a measurement socket here. The needed measurement sockets will be created
294 * automatically and negociated between the peers)
295 * \arg buf_size: Size of the socket buffer
296 * \arg exp_size: Total size of data sent across the network
297 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
298 * \arg sec: where the result (in seconds) should be stored.
299 * \arg bw: observed Bandwidth (in kb/s)
301 * Conduct a bandwidth test from the process from_host:from_port to to_host:to_port.
302 * This call is blocking until the end of the experiment.
304 * Results are reported in last args, and sizes are in kb.
306 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
307 const char* to_name,unsigned int to_port,
308 unsigned long int buf_size,
309 unsigned long int exp_size,
310 unsigned long int msg_size,
311 /*OUT*/ double *sec, double*bw) {
316 bw_request_t request;
319 request=xbt_new0(s_bw_request_t,1);
320 request->buf_size=buf_size;
321 request->exp_size=exp_size;
322 request->msg_size=msg_size;
324 request->host.name = (char*)to_name;
325 request->host.port = to_port;
327 TRY(gras_socket_client(from_name,from_port,&sock));
328 TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
331 TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
336 VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
337 from_name,from_port, to_name,to_port,
340 gras_socket_close(sock);
345 int amok_bw_cb_bw_request(gras_socket_t expeditor,
349 /* specification of the test to run, and our answer */
350 bw_request_t request = *(bw_request_t*)payload;
351 bw_res_t result = xbt_new0(s_bw_res,1);
354 TRY(gras_socket_client(request->host.name,request->host.port,&peer));
355 TRY(amok_bw_test(peer,
356 request->buf_size,request->exp_size,request->msg_size,
357 &(result->sec),&(result->bw)));
359 TRY(gras_msg_send(expeditor,gras_msgtype_by_name("BW result"),&result));
362 gras_socket_close(peer);
369 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
370 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
372 unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
373 unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
374 unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
379 if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
380 &(res[0].value),&(res[1].value) ))) {
382 "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
383 __FILE__,__LINE__,xbt_error_name(error->errcode));
384 strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
385 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
390 res[0].timestamp = (unsigned int) gras_time();
391 res[1].timestamp = (unsigned int) gras_time();
392 gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
400 int amok_bw_cb_sat_start(gras_socket_t expeditor,
402 CRITICAL0("amok_bw_cb_sat_start; not implemented");
405 int amok_bw_cb_sat_begin(gras_socket_t expeditor,
407 CRITICAL0("amok_bw_cb_sat_begin: not implemented");
411 /* ***************************************************************************
413 * ***************************************************************************/
415 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
416 const char* to_name,unsigned int to_port,
417 unsigned int msgSize, unsigned int timeout) {
426 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
427 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
428 __FILE__,__LINE__,xbt_error_name(errcode));
431 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
432 !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
433 fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
434 gras_sock_close(sock);
438 request->timeout=timeout;
439 request->msgSize=msgSize;
441 strcpy(target->host,to_name);
442 target->port=to_port;
444 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2,
447 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
448 __FILE__,__LINE__,xbt_error_name(errcode));
449 gras_sock_close(sock);
452 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
453 fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
454 __FILE__,__LINE__,xbt_error_name(errcode));
455 gras_sock_close(sock);
459 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
460 fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
461 __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
462 gras_msg_free(answer);
463 gras_sock_close(sock);
467 gras_msg_free(answer);
468 gras_sock_close(sock);
472 int grasbw_cbSatStart(gras_msg_t *msg) {
476 double start; /* time to timeout */
478 /* specification of the test to run */
479 char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
480 unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
482 unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
483 unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
484 unsigned int raw_port;
492 fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
493 fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
494 msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
495 msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
498 /* Negociate the saturation with the peer */
499 if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
500 fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
501 xbt_error_name(errcode));
502 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
503 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
504 errcode,"Cannot contact peer.\n");
507 if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
508 fprintf(stderr,"cbSatStart(): Malloc error\n");
509 gras_sock_close(sock);
510 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512 malloc_error,"Cannot build request.\n");
516 request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
517 request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
519 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1,
521 fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
522 xbt_error_name(errcode));
523 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
524 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
525 errcode,"Cannot send request.\n");
526 gras_sock_close(sock);
530 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
531 fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
532 xbt_error_name(errcode));
533 gras_sock_close(sock);
535 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
536 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
538 "Cannot receive the ACK.\n");
542 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
543 fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
544 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
546 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
547 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
549 "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
550 gras_msg_free(answer);
551 gras_sock_close(sock);
555 raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
557 if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
558 fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
559 xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
561 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
562 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
563 errcode,"Cannot open raw socket.\n");
564 gras_sock_close(sock);
568 /* send a train of data before repporting that XP is started */
569 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
570 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
571 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
572 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
573 errcode,"Cannot raw send.\n");
574 gras_sock_close(sock);
575 gras_rawsock_close(raw);
579 grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
580 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
581 no_error,"Saturation started");
582 gras_msg_free(answer);
585 /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
587 while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error &&
588 gras_time()-start < timeout) {
589 if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
590 fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
591 /* our error message do not interess anyone. SAT_STOP will do nothing. */
592 gras_sock_close(sock);
593 gras_rawsock_close(raw);
597 if (gras_time()-start > timeout) {
598 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
599 gras_sock_close(sock);
600 gras_rawsock_close(raw);
604 /* Handle the SAT_STOP which broke the previous while */
606 if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
607 fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
609 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
610 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
611 errcode,"Sending SAT_END to peer failed.\n");
612 gras_sock_close(sock);
613 gras_rawsock_close(raw);
617 if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
618 fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
620 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
621 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
622 errcode,"Receiving SAT_ENDED from peer failed.\n");
623 gras_sock_close(sock);
624 gras_rawsock_close(raw);
627 grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
628 "cbSatStart: Severe error: Cannot send error status to requester!!\n",
631 gras_sock_close(sock);
632 gras_rawsock_close(raw);
633 gras_msg_free(answer);
639 int grasbw_cbSatBegin(gras_msg_t *msg) {
642 double start; /* timer */
644 unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
645 unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
650 if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
651 !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
652 fprintf(stderr,"cbSatBegin(): Malloc error\n");
653 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
654 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
655 malloc_error,"Malloc error");
659 if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) {
660 fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
661 xbt_error_name(errcode));
662 grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
663 "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
664 errcode,"Cannot open raw socket");
667 request->port=gras_rawsock_get_peer_port(raw);
668 request->msgSize=msgSize;
669 error->errcode=no_error;
670 error->errmsg[0]='\0';
671 if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
674 fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
675 xbt_error_name(errcode));
681 while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
682 gras_time() - start < timeout) {
683 errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
684 if (errcode != timeout_error && errcode != no_error) {
685 fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
686 /* our error message do not interess anyone. SAT_END will do nothing. */
687 /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
691 if (gras_time()-start > timeout) {
692 fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
693 gras_rawsock_close(raw);
697 grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
698 "cbSatBegin: Cannot send SAT_ENDED.\n",
700 gras_rawsock_close(raw);
705 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
706 const char* to_name,unsigned int to_port) {
711 if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
712 fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
713 xbt_error_name(errcode));
717 if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
718 fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
719 xbt_error_name(errcode));
720 gras_sock_close(sock);
724 if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
725 fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
726 xbt_error_name(errcode));
727 gras_sock_close(sock);
731 if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
732 fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
733 xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
734 gras_msg_free(answer);
735 gras_sock_close(sock);
739 gras_msg_free(answer);
740 gras_sock_close(sock);