Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bw_request now works
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /**** code ****/
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   amok_base_init();
22
23   if (! _amok_bw_initialized) {
24         
25      /* Build the datatype descriptions */ 
26      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27      gras_datadesc_struct_append(bw_request_desc,"host",
28                                  gras_datadesc_by_name("xbt_host_t"));
29      gras_datadesc_struct_append(bw_request_desc,"buf_size",
30                                  gras_datadesc_by_name("unsigned long int"));
31      gras_datadesc_struct_append(bw_request_desc,"exp_size",
32                                  gras_datadesc_by_name("unsigned long int"));
33      gras_datadesc_struct_append(bw_request_desc,"msg_size",
34                                  gras_datadesc_by_name("unsigned long int"));
35      gras_datadesc_struct_close(bw_request_desc);
36      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
37
38      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW request",       bw_request_desc);
55      gras_msgtype_declare("BW result",        bw_res_desc);
56      gras_msgtype_declare("BW handshake",     bw_request_desc);
57      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 void amok_bw_exit(void) {
85   if (! _amok_bw_initialized)
86     return;
87    
88   gras_cb_unregister(gras_msgtype_by_name("BW request"),
89                      &amok_bw_cb_bw_request);
90   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91                      &amok_bw_cb_bw_handshake);
92
93   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94                      &amok_bw_cb_sat_start);
95   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96                      &amok_bw_cb_sat_begin);
97
98   _amok_bw_initialized = 0;
99 }
100
101    
102
103 /* ***************************************************************************
104  * Bandwidth tests
105  * ***************************************************************************/
106
107 /**
108  * \brief bandwidth measurement between localhost and @peer
109  * 
110  * Results are reported in last args, and sizes are in kb.
111  */
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113                          unsigned long int buf_size,
114                          unsigned long int exp_size,
115                          unsigned long int msg_size,
116                          /*OUT*/ double *sec, double *bw) {
117
118   /* Measurement sockets for the experiments */
119   gras_socket_t measMasterIn,measIn,measOut;
120   int port;
121   xbt_error_t errcode;
122   bw_request_t request,request_ack;
123   
124   for (port = 5000, errcode = system_error;
125        errcode == system_error && port < 10000;
126        errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
127   if (errcode != no_error) {
128     ERROR1("Error %s encountered while opening a measurement socket",
129            xbt_error_name(errcode));
130     return errcode;
131   }
132         
133   request=xbt_new0(s_bw_request_t,1);
134   request->buf_size=buf_size*1024;
135   request->exp_size=exp_size*1024;
136   request->msg_size=msg_size*1024;
137   request->host.name = NULL;
138   request->host.port = gras_socket_my_port(measMasterIn);
139   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
140         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
141         buf_size,request->buf_size);
142
143   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
144     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
145     return errcode;
146   }
147   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
148
149   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
150                              NULL,&request_ack))) {
151     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
152             xbt_error_name(errcode));
153     return errcode;
154   }
155    
156   /* FIXME: What if there is a remote error? */
157    
158   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
159                                      request_ack->host.port, 
160                                      request->buf_size,1,&measOut))) {
161     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
162             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
163     return errcode;
164   }
165   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
166
167   *sec=gras_os_time();
168   TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
169   TRY(gras_socket_meas_recv(measIn,120,1,1));
170
171   /*catch
172     ERROR1("Error %s encountered while sending the BW experiment.",
173             xbt_error_name(errcode));
174     gras_socket_close(measOut);
175     gras_socket_close(measMasterIn);
176     gras_socket_close(measIn);
177   */
178
179   *sec = gras_os_time() - *sec;
180   *bw = ((double)exp_size) / *sec;
181
182   free(request_ack);
183   free(request);
184   if (measIn != measMasterIn)
185     gras_socket_close(measIn);
186   gras_socket_close(measMasterIn);
187   gras_socket_close(measOut);
188   return errcode;
189 }
190
191
192 /* Callback to the "BW handshake" message: 
193    opens a server measurement socket,
194    indicate its port in an "BW handshaked" message,
195    receive the corresponding data on the measurement socket, 
196    close the measurment socket
197
198    sizes are in byte (got converted from kb my expeditor)
199 */
200 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
201                             void          *payload) {
202   gras_socket_t measMasterIn,measIn,measOut;
203   bw_request_t request=*(bw_request_t*)payload;
204   bw_request_t answer;
205   xbt_error_t errcode;
206   int port;
207   
208   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
209         gras_socket_peer_name(expeditor),request->host.port,
210         request->buf_size,request->exp_size,request->msg_size);     
211
212   /* Build our answer */
213   answer = xbt_new0(s_bw_request_t,1);
214   
215   for (port = 6000, errcode = system_error;
216        errcode == system_error;
217        errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
218   if (errcode != no_error) {
219     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
220     /* FIXME: tell error to remote */
221     return 1;
222   }
223    
224   answer->buf_size=request->buf_size;
225   answer->exp_size=request->exp_size;
226   answer->msg_size=request->msg_size;
227   answer->host.port=gras_socket_my_port(measMasterIn);
228
229   /* Don't connect asap to leave time to other side to enter the accept() */
230   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
231                                       request->host.port,
232                                       request->buf_size,1,&measOut))) { 
233     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
234            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
235     /* FIXME: tell error to remote */
236     return 1;
237   }
238
239
240   if ((errcode=gras_msg_send(expeditor,
241                              gras_msgtype_by_name("BW handshake ACK"),
242                              &answer))) {
243     ERROR1("Error %s encountered while sending the answer.",
244             xbt_error_name(errcode));
245     gras_socket_close(measMasterIn);
246     gras_socket_close(measOut);
247     /* FIXME: tell error to remote */
248     return 1;
249   }
250   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
251   DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
252         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
253
254   TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
255   TRY(gras_socket_meas_send(measOut,120,1,1));
256
257   /*catch
258     ERROR1("Error %s encountered while receiving the experiment.",
259             xbt_error_name(errcode));
260     gras_socket_close(measMasterIn);
261     gras_socket_close(measIn);
262     gras_socket_close(measOut);
263     * FIXME: tell error to remote ? *
264     return 1;
265     }*/
266
267   if (measIn != measMasterIn)
268     gras_socket_close(measMasterIn);
269   gras_socket_close(measIn);
270   gras_socket_close(measOut);
271   free(answer);
272   free(request);
273   DEBUG0("BW experiment done.");
274   return 1;
275 }
276
277 /**
278  * \brief request a bandwidth measurement between two remote hosts
279  *
280  * Results are reported in last args, and sizes are in kb.
281  */
282 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
283                             const char* to_name,unsigned int to_port,
284                             unsigned long int buf_size,
285                             unsigned long int exp_size,
286                             unsigned long int msg_size,
287                             /*OUT*/ double *sec, double*bw) {
288   
289   gras_socket_t sock;
290   xbt_error_t errcode;
291   /* The request */
292   bw_request_t request;
293   bw_res_t result;
294
295   request=xbt_new0(s_bw_request_t,1);
296   request->buf_size=buf_size;
297   request->exp_size=exp_size;
298   request->msg_size=msg_size;
299
300   request->host.name = (char*)to_name;
301   request->host.port = to_port;
302
303   TRY(gras_socket_client(from_name,from_port,&sock));
304   TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
305   free(request);
306
307   TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
308   
309   *sec=result->seconds;
310   *bw =result->bw;
311
312   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
313         from_name,from_port, to_name,to_port,
314         *sec,*bw);
315
316   gras_socket_close(sock);
317   return no_error;
318 }
319
320 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
321                           void            *payload) {
322   /* specification of the test to run, and our answer */
323   bw_request_t request = *(bw_request_t*)payload;
324   bw_res_t result = xbt_new0(s_bw_res_t,1);
325
326   TRY(gras_socket_client(request->to_name,request->to_port,&peer));
327   TRY(amok_bw_test(peer,
328                    request->buf_size,request->exp_size,request->msg_size,
329                    &(result->sec),&(result->bw)));
330   gras_socket_close(peer);
331   free(request);
332
333
334   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
335   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
336
337   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
338   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
339   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
340   /* our answer */
341   msgError_t *error;
342   msgResult_t *res;
343
344   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
345                                   &(res[0].value),&(res[1].value) ))) {
346     fprintf(stderr,
347             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
348             __FILE__,__LINE__,xbt_error_name(error->errcode));
349     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
350     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
351                    error,1,
352                    res,2);
353     return 1;
354   }
355   res[0].timestamp = (unsigned int) gras_time();
356   res[1].timestamp = (unsigned int) gras_time();
357   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
358                  error,1,
359                  res,2);
360   gras_msg_free(msg);
361   return 1;
362 }
363
364 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
365                          void             *payload) {
366    CRITICAL0("amok_bw_cb_sat_start; not implemented");
367    return 1;
368
369 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
370                          void             *payload) {
371    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
372    return 1;
373 }
374 #if 0
375 /* ***************************************************************************
376  * Link saturation
377  * ***************************************************************************/
378
379 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
380                                   const char* to_name,unsigned int to_port,
381                                   unsigned int msgSize, unsigned int timeout) {
382   gras_sock_t *sock;
383   xbt_error_t errcode;
384   /* The request */
385   SatExp_t *request;
386   msgHost_t *target;
387   /* answer */
388   gras_msg_t *answer;
389
390   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
391     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
392             __FILE__,__LINE__,xbt_error_name(errcode));
393     return errcode;
394   }
395   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
396       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
397     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
398     gras_sock_close(sock);
399     return malloc_error;    
400   }
401
402   request->timeout=timeout;
403   request->msgSize=msgSize;
404
405   strcpy(target->host,to_name);
406   target->port=to_port;
407
408   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
409                               target,1,
410                               request,1))) {
411     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
412             __FILE__,__LINE__,xbt_error_name(errcode));
413     gras_sock_close(sock);
414     return errcode;
415   }
416   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
417     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
418             __FILE__,__LINE__,xbt_error_name(errcode));
419     gras_sock_close(sock);
420     return errcode;
421   }
422
423   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
424     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
425             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
426     gras_msg_free(answer);
427     gras_sock_close(sock);
428     return errcode;
429   }
430
431   gras_msg_free(answer);
432   gras_sock_close(sock);
433   return no_error;
434 }
435
436 int grasbw_cbSatStart(gras_msg_t *msg) {
437   gras_rawsock_t *raw;
438   gras_sock_t *sock;
439   xbt_error_t errcode;
440   double start; /* time to timeout */
441
442   /* specification of the test to run */
443   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
444   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
445
446   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
447   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
448   unsigned int raw_port;
449
450   /* The request */
451   SatExp_t *request;
452   /* answer */
453   gras_msg_t *answer;
454
455   /*
456   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
457   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
458           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
459           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
460   */
461
462   /* Negociate the saturation with the peer */
463   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
464     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
465             xbt_error_name(errcode));
466     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
467                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
468                      errcode,"Cannot contact peer.\n");
469     return 1;
470   }
471   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
472     fprintf(stderr,"cbSatStart(): Malloc error\n");
473     gras_sock_close(sock);
474     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
475                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
476                      malloc_error,"Cannot build request.\n");
477     return 1;    
478   }
479
480   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
481   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
482
483   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
484                               request,1))) {
485     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
486             xbt_error_name(errcode));
487     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
488                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
489                      errcode,"Cannot send request.\n");
490     gras_sock_close(sock);
491     return 1;
492   }
493
494   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
495     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
496             xbt_error_name(errcode));
497     gras_sock_close(sock);
498
499     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
500                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
501                      errcode,
502                      "Cannot receive the ACK.\n");
503     return 1;
504   }
505
506   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
507     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
508             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
509
510     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
511                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
512                      errcode,
513                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
514     gras_msg_free(answer);
515     gras_sock_close(sock);
516     return 1;
517   }
518
519   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
520
521   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
522     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
523             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
524
525     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
526                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
527                      errcode,"Cannot open raw socket.\n");
528     gras_sock_close(sock);
529     return 1;
530   }
531
532   /* send a train of data before repporting that XP is started */
533   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
534     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
535     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
536                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
537                      errcode,"Cannot raw send.\n");
538     gras_sock_close(sock);
539     gras_rawsock_close(raw);
540     return 1;
541   }
542   
543   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
544                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
545                    no_error,"Saturation started");
546   gras_msg_free(answer);
547   gras_msg_free(msg);
548   
549   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
550   start=gras_time();
551   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
552          gras_time()-start < timeout) {
553     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
554       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
555       /* our error message do not interess anyone. SAT_STOP will do nothing. */
556       gras_sock_close(sock);
557       gras_rawsock_close(raw);
558       return 1;
559     } 
560   }
561   if (gras_time()-start > timeout) {
562     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
563     gras_sock_close(sock);
564     gras_rawsock_close(raw);
565     return 1;
566   }
567
568   /* Handle the SAT_STOP which broke the previous while */
569   
570   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
571     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
572
573     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
574                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
575                      errcode,"Sending SAT_END to peer failed.\n");
576     gras_sock_close(sock);
577     gras_rawsock_close(raw);
578     return 1;
579   }
580   
581   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
582     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
583
584     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
585                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
586                      errcode,"Receiving SAT_ENDED from peer failed.\n");
587     gras_sock_close(sock);
588     gras_rawsock_close(raw);
589     return 1;
590   }
591   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
592                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
593                    no_error,"");
594
595   gras_sock_close(sock);
596   gras_rawsock_close(raw);
597   gras_msg_free(answer);
598   gras_msg_free(msg);
599
600   return 1;  
601 }
602
603 int grasbw_cbSatBegin(gras_msg_t *msg) {
604   gras_rawsock_t *raw;
605   xbt_error_t errcode;
606   double start; /* timer */
607   /* request */
608   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
609   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
610   /* answer */
611   SatExp_t *request;
612   msgError_t *error;
613
614   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
615       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
616     fprintf(stderr,"cbSatBegin(): Malloc error\n");
617     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
618                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
619                      malloc_error,"Malloc error");
620     return 1;
621   }
622
623   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
624     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
625             xbt_error_name(errcode));
626     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
627                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
628                      errcode,"Cannot open raw socket");
629     return 1;
630   }
631   request->port=gras_rawsock_get_peer_port(raw);
632   request->msgSize=msgSize;
633   error->errcode=no_error;
634   error->errmsg[0]='\0';
635   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
636                               error,1,
637                               request,1))) {
638     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
639             xbt_error_name(errcode));
640     return 1;
641   }
642   gras_msg_free(msg);
643
644   start=gras_time();
645   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
646          gras_time() - start < timeout) {
647     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
648     if (errcode != timeout_error && errcode != no_error) {
649       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
650       /* our error message do not interess anyone. SAT_END will do nothing. */
651       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
652       return 1;
653     } 
654   }
655   if (gras_time()-start > timeout) {
656     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
657     gras_rawsock_close(raw);
658     return 1;
659   }
660
661   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
662                    "cbSatBegin: Cannot send SAT_ENDED.\n",
663                    no_error,"");
664   gras_rawsock_close(raw);
665   gras_msg_free(msg);
666   return 1;
667 }
668
669 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
670                                  const char* to_name,unsigned int to_port) {
671   xbt_error_t errcode;
672   gras_sock_t *sock;
673   gras_msg_t *answer;
674
675   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
676     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
677             xbt_error_name(errcode));
678     return errcode;
679   }
680
681   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
682     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
683             xbt_error_name(errcode));
684     gras_sock_close(sock);
685     return errcode;
686   }
687
688   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
689     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
690             xbt_error_name(errcode));
691     gras_sock_close(sock);
692     return errcode;
693   }
694
695   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
696     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
697             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
698     gras_msg_free(answer);
699     gras_sock_close(sock);
700     return errcode;
701   }
702
703   gras_msg_free(answer);
704   gras_sock_close(sock);
705
706   return no_error;
707 }
708 #endif