Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
88f2fec2c30530efa201d36a722fc095d10b2d64
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003, 2004 Martin Quinson. All rights reserved.            */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /**** code ****/
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   if (_amok_bw_initialized)
22      return;
23    
24   amok_base_init();
25    
26   /* Build the datatype descriptions */ 
27   bw_request_desc = gras_datadesc_struct("s_bw_request_t");
28   gras_datadesc_struct_append(bw_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
29   gras_datadesc_struct_append(bw_request_desc,"buf_size",gras_datadesc_by_name("unsigned int"));
30   gras_datadesc_struct_append(bw_request_desc,"exp_size",gras_datadesc_by_name("unsigned int"));
31   gras_datadesc_struct_append(bw_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
32   gras_datadesc_struct_close(bw_request_desc);
33   bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
34
35   bw_res_desc = gras_datadesc_struct("s_bw_res_t");
36   gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
37   gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
38   gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
39   gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
40   gras_datadesc_struct_close(bw_res_desc);
41   bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
42
43   sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
44   gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
45   gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
46   gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
47   gras_datadesc_struct_close(sat_request_desc);
48   sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
49    
50   /* Register the bandwidth messages */
51   gras_msgtype_declare("BW request",       bw_request_desc);
52   gras_msgtype_declare("BW result",        bw_res_desc);
53   gras_msgtype_declare("BW handshake",     bw_request_desc);
54   gras_msgtype_declare("BW handshake ACK", bw_request_desc);
55
56   /* Register the saturation messages */
57   gras_msgtype_declare("SAT start",   sat_request_desc);
58   gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
59   gras_msgtype_declare("SAT begin",   sat_request_desc);
60   gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
61   gras_msgtype_declare("SAT end",     NULL);
62   gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
63   gras_msgtype_declare("SAT stop",    NULL);
64   gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
65
66   /* Register the callbacks */
67   gras_cb_register(gras_msgtype_by_name("BW request"),
68                    &amok_bw_cb_bw_request);
69   gras_cb_register(gras_msgtype_by_name("BW handshake"),
70                    &amok_bw_cb_bw_handshake);
71
72   gras_cb_register(gras_msgtype_by_name("SAT start"),
73                    &amok_bw_cb_sat_start);
74   gras_cb_register(gras_msgtype_by_name("SAT begin"),
75                    &amok_bw_cb_sat_begin);
76   
77   _amok_bw_initialized =1;
78 }
79
80 void amok_bw_exit(void) {
81   if (! _amok_bw_initialized)
82     return;
83    
84   gras_cb_unregister(gras_msgtype_by_name("BW request"),
85                      &amok_bw_cb_bw_request);
86   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
87                      &amok_bw_cb_bw_handshake);
88
89   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
90                      &amok_bw_cb_sat_start);
91   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
92                      &amok_bw_cb_sat_begin);
93
94   _amok_bw_initialized = 0;
95 }
96
97    
98
99 /* ***************************************************************************
100  * Bandwidth tests
101  * ***************************************************************************/
102
103 /**
104  * amok_bw_test:
105  * 
106  * Conduct a test between the local host and @peer, and 
107  * report the result in last args
108  */
109 xbt_error_t amok_bw_test(gras_socket_t peer,
110                           unsigned int buf_size,unsigned int exp_size,unsigned int msg_size,
111                           /*OUT*/ double *sec, double *bw) {
112   gras_socket_t rawIn,rawOut; /* raw sockets for the experiments */
113   gras_socket_t sock_dummy; /* ignored arg to msg_wait */
114   int port;
115   xbt_error_t errcode;
116   bw_request_t request,request_ack;
117   
118   for (port = 5000, errcode = system_error;
119        errcode == system_error;
120        errcode = gras_socket_server_ext(++port,buf_size,1,&rawIn));
121   if (errcode != no_error) {
122     ERROR1("Error %s encountered while opening a raw socket",
123            xbt_error_name(errcode));
124     return errcode;
125   }
126         
127   request=xbt_new0(s_bw_request_t,1);
128   request->buf_size=buf_size;
129   request->exp_size=exp_size;
130   request->msg_size=msg_size;
131   request->host.name = NULL;
132   request->host.port = gras_socket_my_port(rawIn);
133   INFO1("Send an handshake to get the dude connect to port %d on me", request->host.port);
134
135   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
136     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
137     return errcode;
138   }
139   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),&sock_dummy,&request_ack))) {
140     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
141             xbt_error_name(errcode));
142     return errcode;
143   }
144    
145   /* FIXME: What if there is a remote error? */
146    
147   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),request_ack->host.port, buf_size,1,&rawOut))) {
148     ERROR3("Error %s encountered while opening the raw socket to %s:%d for BW test\n",
149             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
150     return errcode;
151   }
152   free(request_ack);
153   INFO0("Got ACK");
154
155   *sec=gras_os_time();
156   if ((errcode=gras_socket_raw_send(rawOut,120,exp_size,msg_size)) ||
157       (errcode=gras_socket_raw_recv(rawIn,120,1,1))) {
158     ERROR1("Error %s encountered while sending the BW experiment.",
159             xbt_error_name(errcode));
160     gras_socket_close(rawOut);
161     gras_socket_close(rawIn);
162     return errcode;
163   }
164   *sec = gras_os_time() - *sec;
165   *bw = ((double)exp_size /* 8.0*/) / *sec / (1024.0 *1024.0);
166 INFO0("DOOONE");
167    
168   gras_socket_close(rawIn);
169   gras_socket_close(rawOut);
170   return no_error;
171 }
172
173
174 /* Callback to the "BW handshake" message: 
175    opens a server raw socket,
176    indicate its port in an "BW handshaked" message,
177    receive the corresponding data on the raw socket, 
178    close the raw socket
179 */
180 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
181                             void          *payload) {
182   gras_socket_t rawIn,rawOut;
183   bw_request_t request=*(bw_request_t*)payload;
184   bw_request_t answer;
185   xbt_error_t errcode;
186   int port;
187   
188   INFO2("Got an handshake to connect to %s:%d",
189         request->host.name,request->host.port);
190      
191   answer = xbt_new0(s_bw_request_t,1);
192   
193   for (port = 5000, errcode = system_error;
194        errcode == system_error;
195        errcode = gras_socket_server_ext(++port,request->buf_size,1,&rawIn));
196   if (errcode != no_error) {
197     ERROR1("Error %s encountered while opening a raw socket", xbt_error_name(errcode));
198     /* FIXME: tell error to remote */
199     return 1;
200   }
201
202   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),request->host.port,
203                                       request->buf_size,1,&rawOut))) { 
204     ERROR3("Error '%s' encountered while opening a raw socket to %s:%d", 
205            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
206     /* FIXME: tell error to remote */
207     return 1;
208   }
209    
210   answer->buf_size=request->buf_size;
211   answer->exp_size=request->exp_size;
212   answer->msg_size=request->msg_size;
213   answer->host.port=gras_socket_my_port(rawIn);
214
215   if ((errcode=gras_msg_send(expeditor,gras_msgtype_by_name("BW handshake ACK"),&answer))) {
216     ERROR1("Error %s encountered while sending the answer.",
217             xbt_error_name(errcode));
218     gras_socket_close(rawIn);
219     gras_socket_close(rawOut);
220     /* FIXME: tell error to remote */
221     return 1;
222   }
223   INFO4("BW handshake answered. buf_size=%d exp_size=%d msg_size=%d port=%d",
224         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
225     
226   if ((errcode=gras_socket_raw_recv(rawIn, 120,request->exp_size,request->msg_size)) ||
227       (errcode=gras_socket_raw_send(rawOut,120,1,1))) {
228     ERROR1("Error %s encountered while receiving the experiment.",
229             xbt_error_name(errcode));
230     gras_socket_close(rawIn);
231     gras_socket_close(rawOut);
232     /* FIXME: tell error to remote ? */
233     return 1;
234   }
235   gras_socket_close(rawIn);
236   gras_socket_close(rawOut);
237   return 1;
238 }
239
240 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
241                           void            *payload) {return 1;}
242
243 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
244                          void             *payload) {return 1;} 
245 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
246                          void             *payload) {return 1;}
247
248 #if 0
249 /* function to request a BW test between two external hosts */
250 xbt_error_t grasbw_request(const char* from_name,unsigned int from_port,
251                            const char* to_name,unsigned int to_port,
252                            unsigned int bufSize,unsigned int expSize,unsigned int msgSize,
253                            /*OUT*/ double *sec, double*bw) {
254   
255   gras_sock_t *sock;
256   gras_msg_t *answer;
257   xbt_error_t errcode;
258   /* The request */
259   BwExp_t *request;
260   msgHost_t *target;
261
262   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
263     fprintf(stderr,"grasbw_request(): Error %s encountered while contacting the actuator\n",
264             xbt_error_name(errcode));
265     return errcode;
266   }
267   if (!(request=(BwExp_t *)malloc(sizeof(BwExp_t))) ||
268       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
269     fprintf(stderr,"grasbw_test(): Malloc error\n");
270     gras_sock_close(sock);
271     return malloc_error;    
272   }
273
274   request->bufSize=bufSize;
275   request->expSize=expSize;
276   request->msgSize=msgSize;
277   strcpy(target->host,to_name);
278   target->port=to_port;
279   
280   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_BW_REQUEST, 2, 
281                               target,1,
282                               request,1))) {
283     fprintf(stderr,"grasbw_request(): Error %s encountered while sending the request.\n",
284             xbt_error_name(errcode));
285     gras_sock_close(sock);
286     return errcode;
287   }
288   if ((errcode=gras_msg_wait(240,GRASMSG_BW_RESULT,&answer))) {
289     fprintf(stderr,"grasbw_request(): Error %s encountered while waiting for the answer.\n",
290             xbt_error_name(errcode));
291     gras_sock_close(sock);
292     return errcode;
293   }
294
295   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
296     fprintf(stderr,"grasbw_request(): Peer reported error %s (%s).\n",
297             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
298     gras_msg_free(answer);
299     gras_sock_close(sock);
300     return errcode;
301   }
302
303   /*  fprintf(stderr,"sec=%p",gras_msg_ctn(answer,1,0,msgResult_t)); */
304   *sec=gras_msg_ctn(answer,1,0,msgResult_t).value;
305   *bw=gras_msg_ctn(answer,1,1,msgResult_t).value;
306
307   gras_msg_free(answer);
308   gras_sock_close(sock);
309   return no_error;
310 }
311
312 int grasbw_cbBWRequest(gras_msg_t *msg) {
313   /* specification of the test to run */
314   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
315   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
316
317   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
318   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
319   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
320   /* our answer */
321   msgError_t *error;
322   msgResult_t *res;
323
324   if (!(error=(msgError_t *)malloc(sizeof(msgError_t))) ||
325       !(res=(msgResult_t *)malloc(sizeof(msgResult_t) * 2))) {
326     fprintf(stderr,"%s:%d:grasbw_cbRequest: Malloc error\n",__FILE__,__LINE__);
327     return malloc_error;    
328   }
329
330   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
331                                   &(res[0].value),&(res[1].value) ))) {
332     fprintf(stderr,
333             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
334             __FILE__,__LINE__,xbt_error_name(error->errcode));
335     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
336     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
337                    error,1,
338                    res,2);
339     return 1;
340   }
341   res[0].timestamp = (unsigned int) gras_time();
342   res[1].timestamp = (unsigned int) gras_time();
343   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
344                  error,1,
345                  res,2);
346   gras_msg_free(msg);
347   return 1;
348 }
349
350 /* ***************************************************************************
351  * Link saturation
352  * ***************************************************************************/
353
354 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
355                                   const char* to_name,unsigned int to_port,
356                                   unsigned int msgSize, unsigned int timeout) {
357   gras_sock_t *sock;
358   xbt_error_t errcode;
359   /* The request */
360   SatExp_t *request;
361   msgHost_t *target;
362   /* answer */
363   gras_msg_t *answer;
364
365   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
366     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
367             __FILE__,__LINE__,xbt_error_name(errcode));
368     return errcode;
369   }
370   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
371       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
372     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
373     gras_sock_close(sock);
374     return malloc_error;    
375   }
376
377   request->timeout=timeout;
378   request->msgSize=msgSize;
379
380   strcpy(target->host,to_name);
381   target->port=to_port;
382
383   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
384                               target,1,
385                               request,1))) {
386     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
387             __FILE__,__LINE__,xbt_error_name(errcode));
388     gras_sock_close(sock);
389     return errcode;
390   }
391   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
392     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
393             __FILE__,__LINE__,xbt_error_name(errcode));
394     gras_sock_close(sock);
395     return errcode;
396   }
397
398   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
399     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
400             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
401     gras_msg_free(answer);
402     gras_sock_close(sock);
403     return errcode;
404   }
405
406   gras_msg_free(answer);
407   gras_sock_close(sock);
408   return no_error;
409 }
410
411 int grasbw_cbSatStart(gras_msg_t *msg) {
412   gras_rawsock_t *raw;
413   gras_sock_t *sock;
414   xbt_error_t errcode;
415   double start; /* time to timeout */
416
417   /* specification of the test to run */
418   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
419   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
420
421   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
422   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
423   unsigned int raw_port;
424
425   /* The request */
426   SatExp_t *request;
427   /* answer */
428   gras_msg_t *answer;
429
430   /*
431   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
432   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
433           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
434           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
435   */
436
437   /* Negociate the saturation with the peer */
438   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
439     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
440             xbt_error_name(errcode));
441     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
442                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
443                      errcode,"Cannot contact peer.\n");
444     return 1;
445   }
446   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
447     fprintf(stderr,"cbSatStart(): Malloc error\n");
448     gras_sock_close(sock);
449     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
450                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
451                      malloc_error,"Cannot build request.\n");
452     return 1;    
453   }
454
455   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
456   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
457
458   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
459                               request,1))) {
460     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
461             xbt_error_name(errcode));
462     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
463                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
464                      errcode,"Cannot send request.\n");
465     gras_sock_close(sock);
466     return 1;
467   }
468
469   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
470     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
471             xbt_error_name(errcode));
472     gras_sock_close(sock);
473
474     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
475                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
476                      errcode,
477                      "Cannot receive the ACK.\n");
478     return 1;
479   }
480
481   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
482     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
483             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
484
485     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
486                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
487                      errcode,
488                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
489     gras_msg_free(answer);
490     gras_sock_close(sock);
491     return 1;
492   }
493
494   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
495
496   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
497     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
498             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
499
500     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
501                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
502                      errcode,"Cannot open raw socket.\n");
503     gras_sock_close(sock);
504     return 1;
505   }
506
507   /* send a train of data before repporting that XP is started */
508   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
509     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
510     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512                      errcode,"Cannot raw send.\n");
513     gras_sock_close(sock);
514     gras_rawsock_close(raw);
515     return 1;
516   }
517   
518   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
519                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
520                    no_error,"Saturation started");
521   gras_msg_free(answer);
522   gras_msg_free(msg);
523   
524   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
525   start=gras_time();
526   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
527          gras_time()-start < timeout) {
528     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
529       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
530       /* our error message do not interess anyone. SAT_STOP will do nothing. */
531       gras_sock_close(sock);
532       gras_rawsock_close(raw);
533       return 1;
534     } 
535   }
536   if (gras_time()-start > timeout) {
537     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
538     gras_sock_close(sock);
539     gras_rawsock_close(raw);
540     return 1;
541   }
542
543   /* Handle the SAT_STOP which broke the previous while */
544   
545   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
546     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
547
548     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
549                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
550                      errcode,"Sending SAT_END to peer failed.\n");
551     gras_sock_close(sock);
552     gras_rawsock_close(raw);
553     return 1;
554   }
555   
556   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
557     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
558
559     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
560                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
561                      errcode,"Receiving SAT_ENDED from peer failed.\n");
562     gras_sock_close(sock);
563     gras_rawsock_close(raw);
564     return 1;
565   }
566   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
567                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
568                    no_error,"");
569
570   gras_sock_close(sock);
571   gras_rawsock_close(raw);
572   gras_msg_free(answer);
573   gras_msg_free(msg);
574
575   return 1;  
576 }
577
578 int grasbw_cbSatBegin(gras_msg_t *msg) {
579   gras_rawsock_t *raw;
580   xbt_error_t errcode;
581   double start; /* timer */
582   /* request */
583   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
584   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
585   /* answer */
586   SatExp_t *request;
587   msgError_t *error;
588
589   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
590       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
591     fprintf(stderr,"cbSatBegin(): Malloc error\n");
592     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
593                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
594                      malloc_error,"Malloc error");
595     return 1;
596   }
597
598   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
599     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
600             xbt_error_name(errcode));
601     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
602                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
603                      errcode,"Cannot open raw socket");
604     return 1;
605   }
606   request->port=gras_rawsock_get_peer_port(raw);
607   request->msgSize=msgSize;
608   error->errcode=no_error;
609   error->errmsg[0]='\0';
610   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
611                               error,1,
612                               request,1))) {
613     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
614             xbt_error_name(errcode));
615     return 1;
616   }
617   gras_msg_free(msg);
618
619   start=gras_time();
620   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
621          gras_time() - start < timeout) {
622     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
623     if (errcode != timeout_error && errcode != no_error) {
624       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
625       /* our error message do not interess anyone. SAT_END will do nothing. */
626       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
627       return 1;
628     } 
629   }
630   if (gras_time()-start > timeout) {
631     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
632     gras_rawsock_close(raw);
633     return 1;
634   }
635
636   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
637                    "cbSatBegin: Cannot send SAT_ENDED.\n",
638                    no_error,"");
639   gras_rawsock_close(raw);
640   gras_msg_free(msg);
641   return 1;
642 }
643
644 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
645                                  const char* to_name,unsigned int to_port) {
646   xbt_error_t errcode;
647   gras_sock_t *sock;
648   gras_msg_t *answer;
649
650   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
651     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
652             xbt_error_name(errcode));
653     return errcode;
654   }
655
656   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
657     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
658             xbt_error_name(errcode));
659     gras_sock_close(sock);
660     return errcode;
661   }
662
663   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
664     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
665             xbt_error_name(errcode));
666     gras_sock_close(sock);
667     return errcode;
668   }
669
670   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
671     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
672             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
673     gras_msg_free(answer);
674     gras_sock_close(sock);
675     return errcode;
676   }
677
678   gras_msg_free(answer);
679   gras_sock_close(sock);
680
681   return no_error;
682 }
683 #endif