Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f206a41944c4b0aa395a3a1eeb2b25d61167e441
[simgrid.git] / src / amok / Bandwidth / bandwidth.c
1 /* $Id$ */
2
3 /* amok_bandwidth - Bandwidth tests facilities                              */
4
5 /* Copyright (c) 2003-5 Martin Quinson. All rights reserved.                */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "amok/Bandwidth/bandwidth_private.h"
11 #include "gras/messages.h"
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
14
15 static short _amok_bw_initialized = 0;
16
17 /**** code ****/
18 void amok_bw_init(void) {
19   gras_datadesc_type_t bw_request_desc, bw_res_desc, sat_request_desc;
20
21   amok_base_init();
22
23   if (! _amok_bw_initialized) {
24         
25      /* Build the datatype descriptions */ 
26      bw_request_desc = gras_datadesc_struct("s_bw_request_t");
27      gras_datadesc_struct_append(bw_request_desc,"host",
28                                  gras_datadesc_by_name("xbt_host_t"));
29      gras_datadesc_struct_append(bw_request_desc,"buf_size",
30                                  gras_datadesc_by_name("unsigned long int"));
31      gras_datadesc_struct_append(bw_request_desc,"exp_size",
32                                  gras_datadesc_by_name("unsigned long int"));
33      gras_datadesc_struct_append(bw_request_desc,"msg_size",
34                                  gras_datadesc_by_name("unsigned long int"));
35      gras_datadesc_struct_close(bw_request_desc);
36      bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
37
38      bw_res_desc = gras_datadesc_struct("s_bw_res_t");
39      gras_datadesc_struct_append(bw_res_desc,"err",gras_datadesc_by_name("s_amok_remoterr_t"));
40      gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
41      gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
42      gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
43      gras_datadesc_struct_close(bw_res_desc);
44      bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
45
46      sat_request_desc = gras_datadesc_struct("s_sat_request_desc_t");
47      gras_datadesc_struct_append(sat_request_desc,"host",gras_datadesc_by_name("xbt_host_t"));
48      gras_datadesc_struct_append(sat_request_desc,"msg_size",gras_datadesc_by_name("unsigned int"));
49      gras_datadesc_struct_append(sat_request_desc,"timeout",gras_datadesc_by_name("unsigned int"));
50      gras_datadesc_struct_close(sat_request_desc);
51      sat_request_desc = gras_datadesc_ref("sat_request_t",sat_request_desc);
52      
53      /* Register the bandwidth messages */
54      gras_msgtype_declare("BW request",       bw_request_desc);
55      gras_msgtype_declare("BW result",        bw_res_desc);
56      gras_msgtype_declare("BW handshake",     bw_request_desc);
57      gras_msgtype_declare("BW handshake ACK", bw_request_desc);
58      
59      /* Register the saturation messages */
60      gras_msgtype_declare("SAT start",   sat_request_desc);
61      gras_msgtype_declare("SAT started", gras_datadesc_by_name("amok_remoterr_t"));
62      gras_msgtype_declare("SAT begin",   sat_request_desc);
63      gras_msgtype_declare("SAT begun",   gras_datadesc_by_name("amok_remoterr_t"));
64      gras_msgtype_declare("SAT end",     NULL);
65      gras_msgtype_declare("SAT ended",   gras_datadesc_by_name("amok_remoterr_t"));
66      gras_msgtype_declare("SAT stop",    NULL);
67      gras_msgtype_declare("SAT stopped", gras_datadesc_by_name("amok_remoterr_t"));
68   }
69    
70   /* Register the callbacks */
71   gras_cb_register(gras_msgtype_by_name("BW request"),
72                    &amok_bw_cb_bw_request);
73   gras_cb_register(gras_msgtype_by_name("BW handshake"),
74                    &amok_bw_cb_bw_handshake);
75
76   gras_cb_register(gras_msgtype_by_name("SAT start"),
77                    &amok_bw_cb_sat_start);
78   gras_cb_register(gras_msgtype_by_name("SAT begin"),
79                    &amok_bw_cb_sat_begin);
80   
81   _amok_bw_initialized =1;
82 }
83
84 void amok_bw_exit(void) {
85   if (! _amok_bw_initialized)
86     return;
87    
88   gras_cb_unregister(gras_msgtype_by_name("BW request"),
89                      &amok_bw_cb_bw_request);
90   gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
91                      &amok_bw_cb_bw_handshake);
92
93   gras_cb_unregister(gras_msgtype_by_name("SAT start"),
94                      &amok_bw_cb_sat_start);
95   gras_cb_unregister(gras_msgtype_by_name("SAT begin"),
96                      &amok_bw_cb_sat_begin);
97
98   _amok_bw_initialized = 0;
99 }
100
101    
102
103 /* ***************************************************************************
104  * Bandwidth tests
105  * ***************************************************************************/
106
107 /**
108  * \brief bandwidth measurement between localhost and @peer
109  * 
110  * Results are reported in last args, and sizes are in kb.
111  */
112 xbt_error_t amok_bw_test(gras_socket_t peer,
113                          unsigned long int buf_size,
114                          unsigned long int exp_size,
115                          unsigned long int msg_size,
116                          /*OUT*/ double *sec, double *bw) {
117
118   /* Measurement sockets for the experiments */
119   gras_socket_t measMasterIn,measIn,measOut;
120   int port;
121   xbt_error_t errcode;
122   bw_request_t request,request_ack;
123   
124   for (port = 5000, errcode = system_error;
125        errcode == system_error && port < 10000;
126        errcode = gras_socket_server_ext(++port,buf_size,1,&measMasterIn));
127   if (errcode != no_error) {
128     ERROR1("Error %s encountered while opening a measurement socket",
129            xbt_error_name(errcode));
130     return errcode;
131   }
132         
133   request=xbt_new0(s_bw_request_t,1);
134   request->buf_size=buf_size*1024;
135   request->exp_size=exp_size*1024;
136   request->msg_size=msg_size*1024;
137   request->host.name = NULL;
138   request->host.port = gras_socket_my_port(measMasterIn);
139   VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld kb= %ld b)", 
140         gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
141         buf_size,request->buf_size);
142
143   if ((errcode=gras_msg_send(peer,gras_msgtype_by_name("BW handshake"),&request))) {
144     ERROR1("Error %s encountered while sending the BW request.", xbt_error_name(errcode));
145     return errcode;
146   }
147   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
148
149   if ((errcode=gras_msg_wait(60,gras_msgtype_by_name("BW handshake ACK"),
150                              NULL,&request_ack))) {
151     ERROR1("Error %s encountered while waiting for the answer to BW request.\n",
152             xbt_error_name(errcode));
153     return errcode;
154   }
155    
156   /* FIXME: What if there is a remote error? */
157    
158   if((errcode=gras_socket_client_ext(gras_socket_peer_name(peer),
159                                      request_ack->host.port, 
160                                      request->buf_size,1,&measOut))) {
161     ERROR3("Error %s encountered while opening the measurement socket to %s:%d for BW test\n",
162             xbt_error_name(errcode),gras_socket_peer_name(peer),request_ack->host.port);
163     return errcode;
164   }
165   DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
166
167   *sec=gras_os_time();
168   TRY(gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size));
169   TRY(gras_socket_meas_recv(measIn,120,1,1));
170
171   /*catch
172     ERROR1("Error %s encountered while sending the BW experiment.",
173             xbt_error_name(errcode));
174     gras_socket_close(measOut);
175     gras_socket_close(measMasterIn);
176     gras_socket_close(measIn);
177   */
178
179   *sec = gras_os_time() - *sec;
180   *bw = ((double)exp_size) / *sec;
181
182   free(request_ack);
183   free(request);
184   if (measIn != measMasterIn)
185     gras_socket_close(measIn);
186   gras_socket_close(measMasterIn);
187   gras_socket_close(measOut);
188   return errcode;
189 }
190
191
192 /* Callback to the "BW handshake" message: 
193    opens a server measurement socket,
194    indicate its port in an "BW handshaked" message,
195    receive the corresponding data on the measurement socket, 
196    close the measurment socket
197
198    sizes are in byte (got converted from kb my expeditor)
199 */
200 int amok_bw_cb_bw_handshake(gras_socket_t  expeditor,
201                             void          *payload) {
202   gras_socket_t measMasterIn,measIn,measOut;
203   bw_request_t request=*(bw_request_t*)payload;
204   bw_request_t answer;
205   xbt_error_t errcode;
206   int port;
207   
208   VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
209         gras_socket_peer_name(expeditor),request->host.port,
210         request->buf_size,request->exp_size,request->msg_size);     
211
212   /* Build our answer */
213   answer = xbt_new0(s_bw_request_t,1);
214   
215   for (port = 6000, errcode = system_error;
216        errcode == system_error;
217        errcode = gras_socket_server_ext(++port,request->buf_size,1,&measMasterIn));
218   if (errcode != no_error) {
219     ERROR1("Error %s encountered while opening a measurement server socket", xbt_error_name(errcode));
220     /* FIXME: tell error to remote */
221     return 1;
222   }
223    
224   answer->buf_size=request->buf_size;
225   answer->exp_size=request->exp_size;
226   answer->msg_size=request->msg_size;
227   answer->host.port=gras_socket_my_port(measMasterIn);
228
229   /* Don't connect asap to leave time to other side to enter the accept() */
230   if ((errcode=gras_socket_client_ext(gras_socket_peer_name(expeditor),
231                                       request->host.port,
232                                       request->buf_size,1,&measOut))) { 
233     ERROR3("Error '%s' encountered while opening a measurement socket back to %s:%d", 
234            xbt_error_name(errcode),gras_socket_peer_name(expeditor),request->host.port);
235     /* FIXME: tell error to remote */
236     return 1;
237   }
238
239
240   if ((errcode=gras_msg_send(expeditor,
241                              gras_msgtype_by_name("BW handshake ACK"),
242                              &answer))) {
243     ERROR1("Error %s encountered while sending the answer.",
244             xbt_error_name(errcode));
245     gras_socket_close(measMasterIn);
246     gras_socket_close(measOut);
247     /* FIXME: tell error to remote */
248     return 1;
249   }
250   TRY(gras_socket_meas_accept(measMasterIn,&measIn));
251   DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
252         answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
253
254   TRY(gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size));
255   TRY(gras_socket_meas_send(measOut,120,1,1));
256
257   /*catch
258     ERROR1("Error %s encountered while receiving the experiment.",
259             xbt_error_name(errcode));
260     gras_socket_close(measMasterIn);
261     gras_socket_close(measIn);
262     gras_socket_close(measOut);
263     * FIXME: tell error to remote ? *
264     return 1;
265     }*/
266
267   if (measIn != measMasterIn)
268     gras_socket_close(measMasterIn);
269   gras_socket_close(measIn);
270   gras_socket_close(measOut);
271   free(answer);
272   free(request);
273   DEBUG0("BW experiment done.");
274   return 1;
275 }
276
277 /**
278  * \brief request a bandwidth measurement between two remote hosts
279  *
280  * Results are reported in last args, and sizes are in kb.
281  */
282 xbt_error_t amok_bw_request(const char* from_name,unsigned int from_port,
283                             const char* to_name,unsigned int to_port,
284                             unsigned long int buf_size,
285                             unsigned long int exp_size,
286                             unsigned long int msg_size,
287                             /*OUT*/ double *sec, double*bw) {
288   
289   gras_socket_t sock;
290   xbt_error_t errcode;
291   /* The request */
292   bw_request_t request;
293   bw_res_t result;
294
295   request=xbt_new0(s_bw_request_t,1);
296   request->buf_size=buf_size;
297   request->exp_size=exp_size;
298   request->msg_size=msg_size;
299
300   request->host.name = (char*)to_name;
301   request->host.port = to_port;
302
303   TRY(gras_socket_client(from_name,from_port,&sock));
304   TRY(gras_msg_send(sock,gras_msgtype_by_name("BW request"),&request));
305   free(request);
306
307   TRY(gras_msg_wait(240,gras_msgtype_by_name("BW result"),NULL, &result));
308   
309   *sec=result->sec;
310   *bw =result->bw;
311
312   VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
313         from_name,from_port, to_name,to_port,
314         *sec,*bw);
315
316   gras_socket_close(sock);
317   return no_error;
318 }
319
320 int amok_bw_cb_bw_request(gras_socket_t    expeditor,
321                           void            *payload) {
322                           
323   xbt_error_t errcode;                    
324   /* specification of the test to run, and our answer */
325   bw_request_t request = *(bw_request_t*)payload;
326   bw_res_t result = xbt_new0(s_bw_res,1);
327   gras_socket_t peer;
328
329   TRY(gras_socket_client(request->host.name,request->host.port,&peer));
330   TRY(amok_bw_test(peer,
331                    request->buf_size,request->exp_size,request->msg_size,
332                    &(result->sec),&(result->bw)));
333   gras_socket_close(peer);
334   free(request);
335   
336   return 1;
337
338 #if 0
339   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
340   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
341
342   unsigned int bufSize=gras_msg_ctn(msg,1,0,BwExp_t).bufSize;
343   unsigned int expSize=gras_msg_ctn(msg,1,0,BwExp_t).expSize;
344   unsigned int msgSize=gras_msg_ctn(msg,1,0,BwExp_t).msgSize;
345   /* our answer */
346   msgError_t *error;
347   msgResult_t *res;
348
349   if ((error->errcode=grasbw_test(to_name,to_port,bufSize,expSize,msgSize,
350                                   &(res[0].value),&(res[1].value) ))) {
351     fprintf(stderr,
352             "%s:%d:grasbw_cbRequest: Error %s encountered while doing the test\n",
353             __FILE__,__LINE__,xbt_error_name(error->errcode));
354     strncpy(error->errmsg,"Error within grasbw_test",ERRMSG_LEN);
355     gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
356                    error,1,
357                    res,2);
358     return 1;
359   }
360   res[0].timestamp = (unsigned int) gras_time();
361   res[1].timestamp = (unsigned int) gras_time();
362   gras_msg_new_and_send(msg->sock,GRASMSG_BW_RESULT,2,
363                  error,1,
364                  res,2);
365   gras_msg_free(msg);
366   return 1;
367 #endif
368 }
369
370 int amok_bw_cb_sat_start(gras_socket_t     expeditor,
371                          void             *payload) {
372    CRITICAL0("amok_bw_cb_sat_start; not implemented");
373    return 1;
374
375 int amok_bw_cb_sat_begin(gras_socket_t     expeditor,
376                          void             *payload) {
377    CRITICAL0("amok_bw_cb_sat_begin: not implemented");
378    return 1;
379 }
380 #if 0
381 /* ***************************************************************************
382  * Link saturation
383  * ***************************************************************************/
384
385 xbt_error_t grasbw_saturate_start(const char* from_name,unsigned int from_port,
386                                   const char* to_name,unsigned int to_port,
387                                   unsigned int msgSize, unsigned int timeout) {
388   gras_sock_t *sock;
389   xbt_error_t errcode;
390   /* The request */
391   SatExp_t *request;
392   msgHost_t *target;
393   /* answer */
394   gras_msg_t *answer;
395
396   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
397     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while contacting peer\n",
398             __FILE__,__LINE__,xbt_error_name(errcode));
399     return errcode;
400   }
401   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t))) ||
402       !(target=(msgHost_t*)malloc(sizeof(msgHost_t)))) {
403     fprintf(stderr,"%s:%d:saturate_start(): Malloc error\n",__FILE__,__LINE__);
404     gras_sock_close(sock);
405     return malloc_error;    
406   }
407
408   request->timeout=timeout;
409   request->msgSize=msgSize;
410
411   strcpy(target->host,to_name);
412   target->port=to_port;
413
414   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_START, 2, 
415                               target,1,
416                               request,1))) {
417     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while sending the request.\n",
418             __FILE__,__LINE__,xbt_error_name(errcode));
419     gras_sock_close(sock);
420     return errcode;
421   }
422   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STARTED,&answer))) {
423     fprintf(stderr,"%s:%d:saturate_start(): Error %s encountered while waiting for the ACK.\n",
424             __FILE__,__LINE__,xbt_error_name(errcode));
425     gras_sock_close(sock);
426     return errcode;
427   }
428
429   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
430     fprintf(stderr,"%s:%d:saturate_start(): Peer reported error %s (%s).\n",
431             __FILE__,__LINE__,xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
432     gras_msg_free(answer);
433     gras_sock_close(sock);
434     return errcode;
435   }
436
437   gras_msg_free(answer);
438   gras_sock_close(sock);
439   return no_error;
440 }
441
442 int grasbw_cbSatStart(gras_msg_t *msg) {
443   gras_rawsock_t *raw;
444   gras_sock_t *sock;
445   xbt_error_t errcode;
446   double start; /* time to timeout */
447
448   /* specification of the test to run */
449   char* to_name=gras_msg_ctn(msg,0,0,msgHost_t).host;
450   unsigned int to_port=gras_msg_ctn(msg,0,0,msgHost_t).port;
451
452   unsigned int msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
453   unsigned int timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
454   unsigned int raw_port;
455
456   /* The request */
457   SatExp_t *request;
458   /* answer */
459   gras_msg_t *answer;
460
461   /*
462   fprintf(stderr,"grasbw_cbSatStart(sd=%p)\n",msg->sock);
463   fprintf(stderr,"(server=%d,raw=%d,fromPID=%d,toPID=%d,toHost=%p,toPort=%d,toChan=%d)\n",
464           msg->sock->server_sock,msg->sock->raw_sock,msg->sock->from_PID,
465           msg->sock->to_PID,msg->sock->to_host,msg->sock->to_port,msg->sock->to_chan);
466   */
467
468   /* Negociate the saturation with the peer */
469   if((errcode=gras_sock_client_open(to_name,to_port,&sock))) {
470     fprintf(stderr,"cbSatStart(): Error %s encountered while contacting peer\n",
471             xbt_error_name(errcode));
472     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
473                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
474                      errcode,"Cannot contact peer.\n");
475     return 1;
476   }
477   if (!(request=(SatExp_t *)malloc(sizeof(SatExp_t)))) {
478     fprintf(stderr,"cbSatStart(): Malloc error\n");
479     gras_sock_close(sock);
480     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
481                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
482                      malloc_error,"Cannot build request.\n");
483     return 1;    
484   }
485
486   request->timeout=gras_msg_ctn(msg,1,0,SatExp_t).timeout;
487   request->msgSize=gras_msg_ctn(msg,1,0,SatExp_t).msgSize;
488
489   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_BEGIN, 1, 
490                               request,1))) {
491     fprintf(stderr,"cbSatStart(): Error %s encountered while sending the request.\n",
492             xbt_error_name(errcode));
493     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
494                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
495                      errcode,"Cannot send request.\n");
496     gras_sock_close(sock);
497     return 1;
498   }
499
500   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_BEGUN,&answer))) {
501     fprintf(stderr,"cbSatStart(): Error %s encountered while waiting for the ACK.\n",
502             xbt_error_name(errcode));
503     gras_sock_close(sock);
504
505     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
506                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
507                      errcode,
508                      "Cannot receive the ACK.\n");
509     return 1;
510   }
511
512   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
513     fprintf(stderr,"cbSatStart(): Peer reported error %s (%s).\n",
514             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
515
516     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
517                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
518                      errcode,
519                      "Peer repported '%s'.\n",gras_msg_ctn(answer,0,0,msgError_t).errmsg);
520     gras_msg_free(answer);
521     gras_sock_close(sock);
522     return 1;
523   }
524
525   raw_port=gras_msg_ctn(answer,1,0,SatExp_t).port;
526
527   if ((errcode=gras_rawsock_client_open(to_name,raw_port,msgSize,&raw))) {
528     fprintf(stderr,"cbSatStart(): Error %s while opening raw socket to %s:%d.\n",
529             xbt_error_name(errcode),to_name,gras_msg_ctn(answer,1,0,SatExp_t).port);
530
531     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
532                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
533                      errcode,"Cannot open raw socket.\n");
534     gras_sock_close(sock);
535     return 1;
536   }
537
538   /* send a train of data before repporting that XP is started */
539   if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
540     fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
541     grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
542                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
543                      errcode,"Cannot raw send.\n");
544     gras_sock_close(sock);
545     gras_rawsock_close(raw);
546     return 1;
547   }
548   
549   grasRepportError(msg->sock,GRASMSG_SAT_STARTED,1,
550                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
551                    no_error,"Saturation started");
552   gras_msg_free(answer);
553   gras_msg_free(msg);
554   
555   /* Do the saturation until we get a SAT_STOP message or until we timeout the whole XP*/
556   start=gras_time();
557   while (gras_msg_wait(0,GRASMSG_SAT_STOP,&msg)==timeout_error && 
558          gras_time()-start < timeout) {
559     if ((errcode=gras_rawsock_send(raw,msgSize,msgSize))) {
560       fprintf(stderr,"cbSatStart: Failure %s during raw send\n",xbt_error_name(errcode));
561       /* our error message do not interess anyone. SAT_STOP will do nothing. */
562       gras_sock_close(sock);
563       gras_rawsock_close(raw);
564       return 1;
565     } 
566   }
567   if (gras_time()-start > timeout) {
568     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW\n");
569     gras_sock_close(sock);
570     gras_rawsock_close(raw);
571     return 1;
572   }
573
574   /* Handle the SAT_STOP which broke the previous while */
575   
576   if ((errcode=gras_msg_new_and_send(sock, GRASMSG_SAT_END,0))) {
577     fprintf(stderr,"cbSatStart(): Cannot tell peer to stop saturation\n");
578
579     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
580                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
581                      errcode,"Sending SAT_END to peer failed.\n");
582     gras_sock_close(sock);
583     gras_rawsock_close(raw);
584     return 1;
585   }
586   
587   if ((errcode=gras_msg_wait(60,GRASMSG_SAT_ENDED,&answer))) {
588     fprintf(stderr,"cbSatStart(): Peer didn't ACK the end\n");
589
590     grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
591                      "cbSatStart: Severe error: Cannot send error status to requester!!\n",
592                      errcode,"Receiving SAT_ENDED from peer failed.\n");
593     gras_sock_close(sock);
594     gras_rawsock_close(raw);
595     return 1;
596   }
597   grasRepportError(msg->sock,GRASMSG_SAT_STOPPED,1,
598                    "cbSatStart: Severe error: Cannot send error status to requester!!\n",
599                    no_error,"");
600
601   gras_sock_close(sock);
602   gras_rawsock_close(raw);
603   gras_msg_free(answer);
604   gras_msg_free(msg);
605
606   return 1;  
607 }
608
609 int grasbw_cbSatBegin(gras_msg_t *msg) {
610   gras_rawsock_t *raw;
611   xbt_error_t errcode;
612   double start; /* timer */
613   /* request */
614   unsigned int msgSize=gras_msg_ctn(msg,0,0,SatExp_t).msgSize;
615   unsigned int timeout=gras_msg_ctn(msg,0,0,SatExp_t).timeout;
616   /* answer */
617   SatExp_t *request;
618   msgError_t *error;
619
620   if (!(request=(SatExp_t*)malloc(sizeof(SatExp_t))) ||
621       !(error=(msgError_t *)malloc(sizeof(msgError_t)))) {
622     fprintf(stderr,"cbSatBegin(): Malloc error\n");
623     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
624                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
625                      malloc_error,"Malloc error");
626     return 1;
627   }
628
629   if ((errcode=gras_rawsock_server_open(6666,8000,msgSize,&raw))) { 
630     fprintf(stderr,"cbSatBegin(): Error %s encountered while opening a raw socket\n",
631             xbt_error_name(errcode));
632     grasRepportError(msg->sock,GRASMSG_SAT_BEGUN,2,
633                      "cbSatBegin: Severe error: Cannot send error status to requester!!\n",
634                      errcode,"Cannot open raw socket");
635     return 1;
636   }
637   request->port=gras_rawsock_get_peer_port(raw);
638   request->msgSize=msgSize;
639   error->errcode=no_error;
640   error->errmsg[0]='\0';
641   if ((errcode=gras_msg_new_and_send(msg->sock,GRASMSG_SAT_BEGUN,2,
642                               error,1,
643                               request,1))) {
644     fprintf(stderr,"cbSatBegin(): Error %s encountered while send ACK to peer\n",
645             xbt_error_name(errcode));
646     return 1;
647   }
648   gras_msg_free(msg);
649
650   start=gras_time();
651   while (gras_msg_wait(0,GRASMSG_SAT_END,&msg)==timeout_error &&
652          gras_time() - start < timeout) {
653     errcode=gras_rawsock_recv(raw,msgSize,msgSize,1);
654     if (errcode != timeout_error && errcode != no_error) {
655       fprintf(stderr,"cbSatBegin: Failure %s during raw receive\n",xbt_error_name(errcode));
656       /* our error message do not interess anyone. SAT_END will do nothing. */
657       /* (if timeout'ed, it may be because the sender stopped emission. so survive it) */
658       return 1;
659     } 
660   }
661   if (gras_time()-start > timeout) {
662     fprintf(stderr,"The saturation experiment did timeout. Stop it NOW.\n");
663     gras_rawsock_close(raw);
664     return 1;
665   }
666
667   grasRepportError(msg->sock,GRASMSG_SAT_ENDED,1,
668                    "cbSatBegin: Cannot send SAT_ENDED.\n",
669                    no_error,"");
670   gras_rawsock_close(raw);
671   gras_msg_free(msg);
672   return 1;
673 }
674
675 xbt_error_t grasbw_saturate_stop(const char* from_name,unsigned int from_port,
676                                  const char* to_name,unsigned int to_port) {
677   xbt_error_t errcode;
678   gras_sock_t *sock;
679   gras_msg_t *answer;
680
681   if((errcode=gras_sock_client_open(from_name,from_port,&sock))) {
682     fprintf(stderr,"saturate_stop(): Error %s encountered while contacting peer\n",
683             xbt_error_name(errcode));
684     return errcode;
685   }
686
687   if ((errcode=gras_msg_new_and_send(sock,GRASMSG_SAT_STOP,0))) {
688     fprintf(stderr,"saturate_stop(): Error %s encountered while sending request\n",
689             xbt_error_name(errcode));
690     gras_sock_close(sock);
691     return errcode;
692   }
693
694   if ((errcode=gras_msg_wait(120,GRASMSG_SAT_STOPPED,&answer))) {
695     fprintf(stderr,"saturate_stop(): Error %s encountered while receiving ACK\n",
696             xbt_error_name(errcode));
697     gras_sock_close(sock);
698     return errcode;
699   }
700
701   if((errcode=gras_msg_ctn(answer,0,0,msgError_t).errcode)) {
702     fprintf(stderr,"saturate_stop(): Peer reported error %s (%s).\n",
703             xbt_error_name(errcode),gras_msg_ctn(answer,0,0,msgError_t).errmsg);
704     gras_msg_free(answer);
705     gras_sock_close(sock);
706     return errcode;
707   }
708
709   gras_msg_free(answer);
710   gras_sock_close(sock);
711
712   return no_error;
713 }
714 #endif