Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Removed a bunch of unused variables. Mostly some xbt_error_t errcode that
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities    */
4
5 /* Authors: Martin Quinson                                                  */
6 /* Copyright (C) 2003, 2004 the OURAGAN project.                            */
7
8 /* This program is free software; you can redistribute it and/or modify it
9    under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "amok/Bandwidth/bandwidth_private.h"
12 #include "gras/messages.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
15
16 static short _amok_bw_initialized = 0;
17
18 /**** code ****/
19 void amok_bw_init(void) {
20   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
21
22   if (_amok_bw_initialized)
23      return;
24    
25   amok_base_init();
26    
27   /* Build the datatype descriptions */ 
28   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
29   gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
30   gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
31   gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
32   gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
33   gras_datadesc_struct_close(bw_request_desc);
34   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
35
36   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
37   gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
38   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
39   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
40   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
41   gras_datadesc_struct_close(bw_res_desc);
42   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
43
44   sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
45   gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
46   gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
47   gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
48   gras_datadesc_struct_close(sat_request_desc);
49   sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
50    
51   /* Register the bandwidth messages */
52   gras_msgtype_declare("BW request",       bw_request_desc);
53   gras_msgtype_declare("BW result",        bw_res_desc);
54   gras_msgtype_declare("BW handshake",     bw_request_desc);
55   gras_msgtype_declare("BW handshake ACK", bw_request_desc);
56
57   /* Register the saturation messages */
58   gras_msgtype_declare("SAT start",   sat_request_desc);
59   gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
60   gras_msgtype_declare("SAT begin",   sat_request_desc);
61   gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
62   gras_msgtype_declare("SAT end",     NULL);
63   gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
64   gras_msgtype_declare("SAT stop",    NULL);
65   gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
66
67   /* Register the callbacks */
68   gras_cb_register(gras_msgtype_by_name("BW request"),
69                    &amok_bw_cb_bw_request);
70   gras_cb_register(gras_msgtype_by_name("BW handshake"),
71                    &amok_bw_cb_bw_handshake);
72
73   gras_cb_register(gras_msgtype_by_name("SAT start"),
74                    &amok_bw_cb_sat_start);
75   gras_cb_register(gras_msgtype_by_name("SAT begin"),
76                    &amok_bw_cb_sat_begin);
77   
78   _amok_bw_initialized =1;
79 }
80
81 void amok_bw_exit(void) {
82   if (! _amok_bw_initialized)
83     return;
84    
85   gras_cb_unregister(gras_msgtype_by_name("BW request"),
86                      &amok_bw_cb_bw_request);
87   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
88                      &amok_bw_cb_bw_handshake);
89
90   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
91                      &amok_bw_cb_sat_start);
92   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
93                      &amok_bw_cb_sat_begin);
94
95   _amok_bw_initialized = 0;
96 }
97
98    
99
100 /* ***************************************************************************
101  * Bandwidth tests
102  * ***************************************************************************/
103
104 /**
105  * amok_bw_test:
106  * 
107  * Conduct a test between the local host and @peer, and 
108  * report the result in last args
109  */
110 xbt_error_t amok_bw_test(gras_socket_t peer,
111                           unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
112                           /*OUT*/ double *sec, double *bw) {
113   gras_socket_t rawIn,rawOut; /* raw sockets for the experiments */
114   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
115   int port;
116   xbt_error_t errcode;
117   bw_request_t request,request_ack;
118   
119   for (port = 5000, errcode = system_error;
120        errcode == system_error;
121        errcode = gras_socket_server_ext(++port,buf_size,1,&rawIn));
122   if (errcode != no_error) {
123     ERROR1("Error %s encountered while opening a raw socket",
124            xbt_error_name(errcode));
125     return errcode;
126   }
127         
128   request=xbt_new0(s_bw_request_t,1);
129   request->buf_size=buf_size;
130   request->exp_size=exp_size;
131   request->msg_size=msg_size;
132   request->host.name = NULL;
133   request->host.port = gras_socket_my_port(rawIn);
134   INFO1("Send an handshake to get the dude connect to port %d on me", request->host.port);
135
136   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
137     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
138     return errcode;
139   }
140   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
141     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
142             xbt_error_name(errcode));
143     return errcode;
144   }
145    
146   /* FIXME: What if there is a remote error? */
147    
148   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&rawOut))) {
149     ERROR3("Error %s encountered while opening the raw socket to %s:%d for BW test\n",
150             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
151     return errcode;
152   }
153   xbt_free(request_ack);
154   INFO0("Got ACK");
155
156   *sec=gras_os_time();
157   if ((errcode=gras_socket_raw_send(rawOut,120,exp_size,msg_size)) ||
158       (errcode=gras_socket_raw_recv(rawIn,120,1,1))) {
159     ERROR1("Error %s encountered while sending the BW experiment.",
160             xbt_error_name(errcode));
161     gras_socket_close(rawOut);
162     gras_socket_close(rawIn);
163     return errcode;
164   }
165   *sec = gras_os_time() - *sec;
166   *bw = ((double)exp_size /* 8.0*/) / *sec / (1024.0 *1024.0);
167 INFO0("DOOONE");
168    
169   gras_socket_close(rawIn);
170   gras_socket_close(rawOut);
171   return no_error;
172 }
173
174
175 /* Callback to the "BW handshake" message: 
176    opens a server raw socket,
177    indicate its port in an "BW handshaked" message,
178    receive the corresponding data on the raw socket, 
179    close the raw socket
180 */
181 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
182                             void          *payload) {
183   gras_socket_t rawIn,rawOut;
184   bw_request_t request=*(bw_request_t*)payload;
185   bw_request_t answer;
186   xbt_error_t errcode;
187   int port;
188   
189   INFO2("Got an handshake to connect to %s:%d",
190         request->host.name,request->host.port);
191      
192   answer = xbt_new0(s_bw_request_t,1);
193   
194   for (port = 5000, errcode = system_error;
195        errcode == system_error;
196        errcode = gras_socket_server_ext(++port,request->buf_size,1,&rawIn));
197   if (errcode != no_error) {
198     ERROR1("Error %s encountered while opening a raw socket", xbt_error_name(errcode));
199     /* FIXME: tell error to remote */
200     return 1;
201   }
202
203   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),request->host.port,
204                                       request->buf_size,1,&rawOut))) { 
205     ERROR3("Error '%s' encountered while opening a raw socket to %s:%d", 
206            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
207     /* FIXME: tell error to remote */
208     return 1;
209   }
210    
211   answer->buf_size=request->buf_size;
212   answer->exp_size=request->exp_size;
213   answer->msg_size=request->msg_size;
214   answer->host.port=gras_socket_my_port(rawIn);
215
216   if ((errcode=gras_msg_send(expeditor,gras_msgtype_by_name("BW handshake ACK"),&answer))) {
217     ERROR1("Error %s encountered while sending the answer.",
218             xbt_error_name(errcode));
219     gras_socket_close(rawIn);
220     gras_socket_close(rawOut);
221     /* FIXME: tell error to remote */
222     return 1;
223   }
224   INFO4("BW handshake answered. buf_size=%d exp_size=%d msg_size=%d port=%d",
225         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
226     
227   if ((errcode=gras_socket_raw_recv(rawIn, 120,request->exp_size,request->msg_size)) ||
228       (errcode=gras_socket_raw_send(rawOut,120,1,1))) {
229     ERROR1("Error %s encountered while receiving the experiment.",
230             xbt_error_name(errcode));
231     gras_socket_close(rawIn);
232     gras_socket_close(rawOut);
233     /* FIXME: tell error to remote ? */
234     return 1;
235   }
236   gras_socket_close(rawIn);
237   gras_socket_close(rawOut);
238   return 1;
239 }
240
241 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
242                           void            *payload) {return 1;}
243
244 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
245                          void             *payload) {return 1;} 
246 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
247                          void             *payload) {return 1;}
248
249 #if 0
250 /* function to request a BW test between two external hosts */
251 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
252                            const char* to_name,unsigned int to_port,
253                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
254                            /*OUT*/ double *sec, double*bw) {
255   
256   gras_sock_t *sock;
257   gras_msg_t *answer;
258   xbt_error_t errcode;
259   /* The request */
260   BwExp_t *request;
261   msgHost_t *target;
262
263   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
264     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
265             xbt_error_name(errcode));
266     return errcode;
267   }
268   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
269       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
270     fprintf(stderr,"grasbw_test(): Malloc error\n");
271     gras_sock_close(sock);
272     return malloc_error;    
273   }
274
275   request->bufSize=bufSize;
276   request->expSize=expSize;
277   request->msgSize=msgSize;
278   strcpy(target->host,to_name);
279   target->port=to_port;
280   
281   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
282                               target,1,
283                               request,1))) {
284     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
285             xbt_error_name(errcode));
286     gras_sock_close(sock);
287     return errcode;
288   }
289   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
290     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
291             xbt_error_name(errcode));
292     gras_sock_close(sock);
293     return errcode;
294   }
295
296   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
297     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
298             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
299     gras_msg_free(answer);
300     gras_sock_close(sock);
301     return errcode;
302   }
303
304   /*  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
305   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
306   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
307
308   gras_msg_free(answer);
309   gras_sock_close(sock);
310   return no_error;
311 }
312
313 int grasbw_cbBWRequest(gras_msg_t *msg) {
314   /* specification of the test to run */
315   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
316   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
317
318   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
319   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
320   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
321   /* our answer */
322   msgError_t *error;
323   msgResult_t *res;
324
325   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
326       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
327     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
328     return malloc_error;    
329   }
330
331   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
332                                   &(res[0].value),&(res[1].value) ))) {
333     fprintf(stderr,
334             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
335             __FILE__,__LINE__,xbt_error_name(error->errcode));
336     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
337     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
338                    error,1,
339                    res,2);
340     return 1;
341   }
342   res[0].timestamp = (unsigned int) gras_time();
343   res[1].timestamp = (unsigned int) gras_time();
344   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
345                  error,1,
346                  res,2);
347   gras_msg_free(msg);
348   return 1;
349 }
350
351 /* ***************************************************************************
352  * Link saturation
353  * ***************************************************************************/
354
355 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
356                                   const char* to_name,unsigned int to_port,
357                                   unsigned int msgSize, unsigned int timeout) {
358   gras_sock_t *sock;
359   xbt_error_t errcode;
360   /* The request */
361   SatExp_t *request;
362   msgHost_t *target;
363   /* answer */
364   gras_msg_t *answer;
365
366   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
367     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
368             __FILE__,__LINE__,xbt_error_name(errcode));
369     return errcode;
370   }
371   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
372       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
373     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
374     gras_sock_close(sock);
375     return malloc_error;    
376   }
377
378   request->timeout=timeout;
379   request->msgSize=msgSize;
380
381   strcpy(target->host,to_name);
382   target->port=to_port;
383
384   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
385                               target,1,
386                               request,1))) {
387     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
388             __FILE__,__LINE__,xbt_error_name(errcode));
389     gras_sock_close(sock);
390     return errcode;
391   }
392   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
393     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
394             __FILE__,__LINE__,xbt_error_name(errcode));
395     gras_sock_close(sock);
396     return errcode;
397   }
398
399   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
400     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
401             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
402     gras_msg_free(answer);
403     gras_sock_close(sock);
404     return errcode;
405   }
406
407   gras_msg_free(answer);
408   gras_sock_close(sock);
409   return no_error;
410 }
411
412 int grasbw_cbSatStart(gras_msg_t *msg) {
413   gras_rawsock_t *raw;
414   gras_sock_t *sock;
415   xbt_error_t errcode;
416   double start; /* time to timeout */
417
418   /* specification of the test to run */
419   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
420   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
421
422   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
423   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
424   unsigned int raw_port;
425
426   /* The request */
427   SatExp_t *request;
428   /* answer */
429   gras_msg_t *answer;
430
431   /*
432   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
433   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
434           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
435           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
436   */
437
438   /* Negociate the saturation with the peer */
439   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
440     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
441             xbt_error_name(errcode));
442     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
443                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
444                      errcode,"Cannot contact peer.\n");
445     return 1;
446   }
447   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
448     fprintf(stderr,"cbSatStart(): Malloc error\n");
449     gras_sock_close(sock);
450     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
451                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
452                      malloc_error,"Cannot build request.\n");
453     return 1;    
454   }
455
456   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
457   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
458
459   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
460                               request,1))) {
461     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
462             xbt_error_name(errcode));
463     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
464                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
465                      errcode,"Cannot send request.\n");
466     gras_sock_close(sock);
467     return 1;
468   }
469
470   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
471     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
472             xbt_error_name(errcode));
473     gras_sock_close(sock);
474
475     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
476                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
477                      errcode,
478                      "Cannot receive the ACK.\n");
479     return 1;
480   }
481
482   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
483     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
484             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
485
486     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
487                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
488                      errcode,
489                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
490     gras_msg_free(answer);
491     gras_sock_close(sock);
492     return 1;
493   }
494
495   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
496
497   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
498     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
499             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
500
501     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
502                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
503                      errcode,"Cannot open raw socket.\n");
504     gras_sock_close(sock);
505     return 1;
506   }
507
508   /* send a train of data before repporting that XP is started */
509   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
510     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
511     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
512                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
513                      errcode,"Cannot raw send.\n");
514     gras_sock_close(sock);
515     gras_rawsock_close(raw);
516     return 1;
517   }
518   
519   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
520                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
521                    no_error,"Saturation started");
522   gras_msg_free(answer);
523   gras_msg_free(msg);
524   
525   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
526   start=gras_time();
527   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
528          gras_time()-start < timeout) {
529     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
530       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
531       /* our error message do not interess anyone. SAT_STOP will do nothing. */
532       gras_sock_close(sock);
533       gras_rawsock_close(raw);
534       return 1;
535     } 
536   }
537   if (gras_time()-start > timeout) {
538     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
539     gras_sock_close(sock);
540     gras_rawsock_close(raw);
541     return 1;
542   }
543
544   /* Handle the SAT_STOP which broke the previous while */
545   
546   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
547     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
548
549     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
550                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
551                      errcode,"Sending SAT_END to peer failed.\n");
552     gras_sock_close(sock);
553     gras_rawsock_close(raw);
554     return 1;
555   }
556   
557   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
558     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
559
560     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
561                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
562                      errcode,"Receiving SAT_ENDED from peer failed.\n");
563     gras_sock_close(sock);
564     gras_rawsock_close(raw);
565     return 1;
566   }
567   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
568                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
569                    no_error,"");
570
571   gras_sock_close(sock);
572   gras_rawsock_close(raw);
573   gras_msg_free(answer);
574   gras_msg_free(msg);
575
576   return 1;  
577 }
578
579 int grasbw_cbSatBegin(gras_msg_t *msg) {
580   gras_rawsock_t *raw;
581   xbt_error_t errcode;
582   double start; /* timer */
583   /* request */
584   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
585   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
586   /* answer */
587   SatExp_t *request;
588   msgError_t *error;
589
590   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
591       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
592     fprintf(stderr,"cbSatBegin(): Malloc error\n");
593     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
594                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
595                      malloc_error,"Malloc error");
596     return 1;
597   }
598
599   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
600     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
601             xbt_error_name(errcode));
602     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
603                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
604                      errcode,"Cannot open raw socket");
605     return 1;
606   }
607   request->port=gras_rawsock_get_peer_port(raw);
608   request->msgSize=msgSize;
609   error->errcode=no_error;
610   error->errmsg[0]='\0';
611   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
612                               error,1,
613                               request,1))) {
614     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
615             xbt_error_name(errcode));
616     return 1;
617   }
618   gras_msg_free(msg);
619
620   start=gras_time();
621   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
622          gras_time() - start < timeout) {
623     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
624     if (errcode != timeout_error && errcode != no_error) {
625       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
626       /* our error message do not interess anyone. SAT_END will do nothing. */
627       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
628       return 1;
629     } 
630   }
631   if (gras_time()-start > timeout) {
632     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
633     gras_rawsock_close(raw);
634     return 1;
635   }
636
637   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
638                    "cbSatBegin: Cannot send SAT_ENDED.\n",
639                    no_error,"");
640   gras_rawsock_close(raw);
641   gras_msg_free(msg);
642   return 1;
643 }
644
645 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
646                                  const char* to_name,unsigned int to_port) {
647   xbt_error_t errcode;
648   gras_sock_t *sock;
649   gras_msg_t *answer;
650
651   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
652     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
653             xbt_error_name(errcode));
654     return errcode;
655   }
656
657   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
658     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
659             xbt_error_name(errcode));
660     gras_sock_close(sock);
661     return errcode;
662   }
663
664   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
665     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
666             xbt_error_name(errcode));
667     gras_sock_close(sock);
668     return errcode;
669   }
670
671   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
672     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
673             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
674     gras_msg_free(answer);
675     gras_sock_close(sock);
676     return errcode;
677   }
678
679   gras_msg_free(answer);
680   gras_sock_close(sock);
681
682   return no_error;
683 }
684 #endif